Skip to content

Commit

Permalink
[ML] Tone down ML unassigned job notifications (#79578)
Browse files Browse the repository at this point in the history
When an ML node is restarted the ML jobs that were running
on it often spend a short period unassigned while the work
to assign them to a new node is performed. We used to generate
warning notifications for such jobs while they were unassigned,
which caused unnecessary worry. The warning notifications cause
a yellow warning triangle in the UI jobs list.

This PR changes the ML job assignment notifications so that
instead of a warning notification for every single unassigned
reason that a job cycles through before it is assigned there
will just be info messages for assignment and unassignment.

Once per day we will still audit unassigned jobs and generate
the same warning messages as before. These messages should be
sufficient to report jobs that cannot be assigned for long
periods due to lack of capacity or some other non-transient
problem.

Fixes #79270
  • Loading branch information
droberts195 committed Oct 21, 2021
1 parent 9b92926 commit 947e365
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws
assertThatAuditMessagesMatch(id,
"Created analytics with type [outlier_detection]",
"Estimated memory usage [",
"No node found to start analytics. Reasons [persistent task is awaiting node assignment.]",
"Job requires at least [1tb] free memory on a machine learning capable node to run",
"Started analytics",
"Stopped analytics");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
clusterService, datafeedRunner, mlController, autodetectProcessManager, dataFrameAnalyticsManager, memoryTracker);
this.mlLifeCycleService.set(mlLifeCycleService);
MlAssignmentNotifier mlAssignmentNotifier = new MlAssignmentNotifier(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool,
new MlConfigMigrator(settings, client, clusterService, indexNameExpressionResolver), clusterService);
clusterService);

MlAutoUpdateService mlAutoUpdateService = new MlAutoUpdateService(threadPool,
List.of(new DatafeedConfigAutoUpdater(datafeedConfigProvider, indexNameExpressionResolver)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
Expand All @@ -33,14 +33,12 @@ public class MlAssignmentNotifier implements ClusterStateListener {

private final AnomalyDetectionAuditor anomalyDetectionAuditor;
private final DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;

MlAssignmentNotifier(AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor,
ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
ThreadPool threadPool, ClusterService clusterService) {
this.anomalyDetectionAuditor = anomalyDetectionAuditor;
this.dataFrameAnalyticsAuditor = dataFrameAnalyticsAuditor;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
clusterService.addListener(this);
}
Expand All @@ -56,29 +54,23 @@ public void clusterChanged(ClusterChangedEvent event) {
return;
}

mlConfigMigrator.migrateConfigs(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event));
}
));
}

private void auditChangesToMlTasks(ClusterChangedEvent event) {

if (event.metadataChanged() == false) {
return;
}

threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event));
}

private void auditChangesToMlTasks(ClusterChangedEvent event) {

PersistentTasksCustomMetadata previousTasks = event.previousState().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata currentTasks = event.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);

if (Objects.equals(previousTasks, currentTasks)) {
return;
}

auditMlTasks(event.state().nodes(), previousTasks, currentTasks, false);
auditMlTasks(event.previousState().nodes(), event.state().nodes(), previousTasks, currentTasks, false);
}

/**
Expand All @@ -87,10 +79,11 @@ private void auditChangesToMlTasks(ClusterChangedEvent event) {
* Care must be taken not to call this method frequently.
*/
public void auditUnassignedMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetadata tasks) {
auditMlTasks(nodes, tasks, tasks, true);
auditMlTasks(nodes, nodes, tasks, tasks, true);
}

private void auditMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetadata previousTasks, PersistentTasksCustomMetadata currentTasks,
private void auditMlTasks(DiscoveryNodes previousNodes, DiscoveryNodes currentNodes,
PersistentTasksCustomMetadata previousTasks, PersistentTasksCustomMetadata currentTasks,
boolean alwaysAuditUnassigned) {

for (PersistentTask<?> currentTask : currentTasks.tasks()) {
Expand All @@ -103,45 +96,67 @@ private void auditMlTasks(DiscoveryNodes nodes, PersistentTasksCustomMetadata pr
(isTaskAssigned || alwaysAuditUnassigned == false)) {
continue;
}
boolean wasTaskAssigned = (previousAssignment != null) && (previousAssignment.getExecutorNode() != null);

if (MlTasks.JOB_TASK_NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.JobParams) currentTask.getParams()).getJobId();
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId, "Opening job on node [" + node.toString() + "]");
} else {
String nodeName = nodeName(currentNodes, currentAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId, "Opening job on node [" + nodeName + "]");
} else if (alwaysAuditUnassigned) {
anomalyDetectionAuditor.warning(jobId,
"No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else if (wasTaskAssigned) {
String nodeName = nodeName(previousNodes, previousAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId, "Job unassigned from node [" + nodeName + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
String jobId = datafeedParams.getJobId();
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
if (jobId != null) {
if (jobId != null) {
if (isTaskAssigned) {
String nodeName = nodeName(currentNodes, currentAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId,
"Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
} else {
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
if (alwaysAuditUnassigned == false) {
logger.warn("[{}] {}", jobId, msg);
}
if (jobId != null) {
anomalyDetectionAuditor.warning(jobId, msg);
"Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + nodeName + "]");
} else if (alwaysAuditUnassigned) {
anomalyDetectionAuditor.warning(jobId,
"No node found to start datafeed [" + datafeedParams.getDatafeedId() + "]. Reasons [" +
currentAssignment.getExplanation() + "]");
} else if (wasTaskAssigned) {
String nodeName = nodeName(previousNodes, previousAssignment.getExecutorNode());
anomalyDetectionAuditor.info(jobId,
"Datafeed [" + datafeedParams.getDatafeedId() + "] unassigned from node [" + nodeName + "]");
} else {
logger.warn("[{}] No node found to start datafeed [{}]. Reasons [{}]",
jobId, datafeedParams.getDatafeedId(), currentAssignment.getExplanation());
}
}
} else if (MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME.equals(currentTask.getTaskName())) {
String id = ((StartDataFrameAnalyticsAction.TaskParams) currentTask.getParams()).getId();
if (isTaskAssigned) {
DiscoveryNode node = nodes.get(currentAssignment.getExecutorNode());
dataFrameAnalyticsAuditor.info(id, "Starting analytics on node [" + node.toString() + "]");
} else {
String nodeName = nodeName(currentNodes, currentAssignment.getExecutorNode());
dataFrameAnalyticsAuditor.info(id, "Starting analytics on node [" + nodeName + "]");
} else if (alwaysAuditUnassigned) {
dataFrameAnalyticsAuditor.warning(id,
"No node found to start analytics. Reasons [" + currentAssignment.getExplanation() + "]");
} else if (wasTaskAssigned) {
String nodeName = nodeName(previousNodes, previousAssignment.getExecutorNode());
anomalyDetectionAuditor.info(id, "Analytics unassigned from node [" + nodeName + "]");
}
}
}
}

static String nodeName(DiscoveryNodes nodes, String nodeId) {
// It's possible that we're reporting on a node that left the
// cluster in an earlier cluster state update, in which case
// the cluster state we've got doesn't record its friendly
// name. In this case we have no choice but to use the ID. (We
// also use the ID in tests that don't bother to name nodes.)
DiscoveryNode node = nodes.get(nodeId);
if (node != null && Strings.hasLength(node.getName())) {
return node.getName();
}
return nodeId;
}
}

0 comments on commit 947e365

Please sign in to comment.