Skip to content

Commit

Permalink
During ML maintenance, reset jobs in the reset state without a corres…
Browse files Browse the repository at this point in the history
…ponding task. (#106062)

* During ML maintenance, reset jobs in the reset state without a corresponding task.

* Update docs/changelog/106062.yaml

* Fix race condition in MlDailyMaintenanceServiceTests

* Fix log level
  • Loading branch information
jan-elastic committed Mar 8, 2024
1 parent 321c4e1 commit 14f7151
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 105 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/106062.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 106062
summary: "During ML maintenance, reset jobs in the reset state without a corresponding\
\ task"
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@ public boolean isDeleting() {
return deleting;
}

public boolean isResetting() {
return blocked != null && Blocked.Reason.RESET.equals(blocked.getReason());
}

public boolean allowLazyOpen() {
return allowLazyOpen;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -26,12 +28,15 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.TypedChainTaskExecutor;

Expand All @@ -41,6 +46,8 @@
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -205,24 +212,34 @@ private void triggerTasks() {
}

private void triggerAnomalyDetectionMaintenance() {
// Step 3: Log any error that could have happened
// Step 4: Log any error that could have happened
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
unused -> {},
e -> logger.error("An error occurred during [ML] maintenance tasks execution", e)
e -> logger.warn("An error occurred during [ML] maintenance tasks execution", e)
);

// Step 2: Delete expired data
// Step 3: Delete expired data
ActionListener<AcknowledgedResponse> deleteJobsListener = ActionListener.wrap(
unused -> triggerDeleteExpiredDataTask(finalListener),
e -> {
logger.info("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
// Note: Steps 1 and 2 are independent of each other and step 2 is executed even if step 1 failed.
logger.warn("[ML] maintenance task: triggerResetJobsInStateResetWithoutResetTask failed", e);
// Note: Steps 1-3 are independent, so continue upon errors.
triggerDeleteExpiredDataTask(finalListener);
}
);

// Step 1: Delete jobs that are in deleting state
triggerDeleteJobsInStateDeletingWithoutDeletionTask(deleteJobsListener);
// Step 2: Reset jobs that are in resetting state without task
ActionListener<AcknowledgedResponse> resetJobsListener = ActionListener.wrap(
unused -> triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener),
e -> {
logger.warn("[ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask failed", e);
// Note: Steps 1-3 are independent, so continue upon errors.
triggerResetJobsInStateResetWithoutResetTask(deleteJobsListener);
}
);

// Step 1: Delete jobs that are in deleting state without task
triggerDeleteJobsInStateDeletingWithoutDeletionTask(resetJobsListener);
}

private void triggerDataFrameAnalyticsMaintenance() {
Expand Down Expand Up @@ -256,73 +273,111 @@ private void triggerDeleteExpiredDataTask(ActionListener<AcknowledgedResponse> f

// Visible for testing
public void triggerDeleteJobsInStateDeletingWithoutDeletionTask(ActionListener<AcknowledgedResponse> finalListener) {
SetOnce<Set<String>> jobsInStateDeletingHolder = new SetOnce<>();

ActionListener<List<Tuple<DeleteJobAction.Request, AcknowledgedResponse>>> deleteJobsActionListener = finalListener
.delegateFailureAndWrap((delegate, deleteJobsResponses) -> {
List<String> jobIds = deleteJobsResponses.stream()
.filter(t -> t.v2().isAcknowledged() == false)
.map(Tuple::v1)
.map(DeleteJobAction.Request::getJobId)
.collect(toList());
triggerJobsInStateWithoutMatchingTask(
"triggerDeleteJobsInStateDeletingWithoutDeletionTask",
Job::isDeleting,
DeleteJobAction.NAME,
taskInfo -> stripPrefixOrNull(taskInfo.description(), DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX),
DeleteJobAction.INSTANCE,
DeleteJobAction.Request::new,
finalListener
);
}

public void triggerResetJobsInStateResetWithoutResetTask(ActionListener<AcknowledgedResponse> finalListener) {
triggerJobsInStateWithoutMatchingTask(
"triggerResetJobsInStateResetWithoutResetTask",
Job::isResetting,
ResetJobAction.NAME,
taskInfo -> stripPrefixOrNull(taskInfo.description(), MlTasks.JOB_TASK_ID_PREFIX),
ResetJobAction.INSTANCE,
ResetJobAction.Request::new,
finalListener
);
}

/**
* @return If the string starts with the prefix, this returns the string without the prefix.
* Otherwise, this return null.
*/
private static String stripPrefixOrNull(String str, String prefix) {
return str == null || str.startsWith(prefix) == false ? null : str.substring(prefix.length());
}

/**
* Executes a request for each job in a state, while missing the corresponding task. This
* usually indicates the node originally executing the task has died, so retry the request.
*
* @param maintenanceTaskName Name of ML maintenance task; used only for logging.
* @param jobFilter Predicate for filtering the jobs.
* @param taskActionName Action name of the tasks corresponding to the jobs.
* @param jobIdExtractor Function to extract the job ID from the task info (in order to match to the job).
* @param actionType Action type of the request that should be (re)executed.
* @param requestCreator Function to create the request from the job ID.
* @param finalListener Listener that captures the final response.
*/
private void triggerJobsInStateWithoutMatchingTask(
String maintenanceTaskName,
Predicate<Job> jobFilter,
String taskActionName,
Function<TaskInfo, String> jobIdExtractor,
ActionType<AcknowledgedResponse> actionType,
Function<String, AcknowledgedRequest<?>> requestCreator,
ActionListener<AcknowledgedResponse> finalListener
) {
SetOnce<Set<String>> jobsInStateHolder = new SetOnce<>();

ActionListener<List<Tuple<String, AcknowledgedResponse>>> jobsActionListener = finalListener.delegateFailureAndWrap(
(delegate, jobsResponses) -> {
List<String> jobIds = jobsResponses.stream().filter(t -> t.v2().isAcknowledged() == false).map(Tuple::v1).collect(toList());
if (jobIds.isEmpty()) {
logger.info("Successfully completed [ML] maintenance task: triggerDeleteJobsInStateDeletingWithoutDeletionTask");
logger.info("Successfully completed [ML] maintenance task: {}", maintenanceTaskName);
} else {
logger.info("The following ML jobs could not be deleted: [" + String.join(",", jobIds) + "]");
logger.info("[ML] maintenance task {} failed for jobs: {}", maintenanceTaskName, jobIds);
}
delegate.onResponse(AcknowledgedResponse.TRUE);
});
}
);

ActionListener<ListTasksResponse> listTasksActionListener = ActionListener.wrap(listTasksResponse -> {
Set<String> jobsInStateDeleting = jobsInStateDeletingHolder.get();
Set<String> jobsWithDeletionTask = listTasksResponse.getTasks()
.stream()
.filter(t -> t.description() != null)
.filter(t -> t.description().startsWith(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX))
.map(t -> t.description().substring(DeleteJobAction.DELETION_TASK_DESCRIPTION_PREFIX.length()))
.collect(toSet());
Set<String> jobsInStateDeletingWithoutDeletionTask = Sets.difference(jobsInStateDeleting, jobsWithDeletionTask);
if (jobsInStateDeletingWithoutDeletionTask.isEmpty()) {
Set<String> jobsInState = jobsInStateHolder.get();
Set<String> jobsWithTask = listTasksResponse.getTasks().stream().map(jobIdExtractor).filter(Objects::nonNull).collect(toSet());
Set<String> jobsInStateWithoutTask = Sets.difference(jobsInState, jobsWithTask);
if (jobsInStateWithoutTask.isEmpty()) {
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
TypedChainTaskExecutor<Tuple<DeleteJobAction.Request, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
TypedChainTaskExecutor<Tuple<String, AcknowledgedResponse>> chainTaskExecutor = new TypedChainTaskExecutor<>(
EsExecutors.DIRECT_EXECUTOR_SERVICE,
unused -> true,
unused -> true
);
for (String jobId : jobsInStateDeletingWithoutDeletionTask) {
DeleteJobAction.Request request = new DeleteJobAction.Request(jobId);
for (String jobId : jobsInStateWithoutTask) {
chainTaskExecutor.add(
listener -> executeAsyncWithOrigin(
client,
ML_ORIGIN,
DeleteJobAction.INSTANCE,
request,
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(request, response)))
actionType,
requestCreator.apply(jobId),
listener.delegateFailureAndWrap((l, response) -> l.onResponse(Tuple.tuple(jobId, response)))
)
);
}
chainTaskExecutor.execute(deleteJobsActionListener);
chainTaskExecutor.execute(jobsActionListener);
}, finalListener::onFailure);

ActionListener<GetJobsAction.Response> getJobsActionListener = ActionListener.wrap(getJobsResponse -> {
Set<String> jobsInStateDeleting = getJobsResponse.getResponse()
.results()
.stream()
.filter(Job::isDeleting)
.map(Job::getId)
.collect(toSet());
if (jobsInStateDeleting.isEmpty()) {
Set<String> jobsInState = getJobsResponse.getResponse().results().stream().filter(jobFilter).map(Job::getId).collect(toSet());
if (jobsInState.isEmpty()) {
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
jobsInStateDeletingHolder.set(jobsInStateDeleting);
jobsInStateHolder.set(jobsInState);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
TransportListTasksAction.TYPE,
new ListTasksRequest().setActions(DeleteJobAction.NAME),
new ListTasksRequest().setActions(taskActionName),
listTasksActionListener
);
}, finalListener::onFailure);
Expand Down

0 comments on commit 14f7151

Please sign in to comment.