Skip to content

Commit

Permalink
[ML] refactor internal datafeed management (#74018) (#74179)
Browse files Browse the repository at this point in the history
This unifies the concept of object management between datafeeds and anomaly jobs.
  • Loading branch information
benwtrent committed Jun 16, 2021
1 parent ec71240 commit 1f78e9b
Show file tree
Hide file tree
Showing 9 changed files with 589 additions and 473 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@
import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingNamedWritableProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedContextProvider;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedRunner;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager;
Expand Down Expand Up @@ -690,7 +691,17 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
threadPool,
client,
notifier,
xContentRegistry);
xContentRegistry,
indexNameExpressionResolver
);
DatafeedManager datafeedManager = new DatafeedManager(
datafeedConfigProvider,
jobConfigProvider,
xContentRegistry,
clusterService,
settings,
client
);

// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
Expand Down Expand Up @@ -835,7 +846,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client, mlAssignmentNotifier),
jobDataCountsPersister,
datafeedRunner,
datafeedRunner,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
inferenceAuditor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,24 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportDeleteDatafeedAction extends AcknowledgedTransportMasterNodeAction<DeleteDatafeedAction.Request> {

private final Client client;
private final DatafeedConfigProvider datafeedConfigProvider;
private final ClusterService clusterService;
private final DatafeedManager datafeedManager;
private final PersistentTasksService persistentTasksService;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

Expand All @@ -50,14 +45,13 @@ public TransportDeleteDatafeedAction(Settings settings, TransportService transpo
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client, PersistentTasksService persistentTasksService,
NamedXContentRegistry xContentRegistry) {
DatafeedManager datafeedManager) {
super(DeleteDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteDatafeedAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
this.client = client;
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
this.persistentTasksService = persistentTasksService;
this.clusterService = clusterService;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.datafeedManager = datafeedManager;
}

@Override
Expand All @@ -72,14 +66,15 @@ protected void masterOperation(DeleteDatafeedAction.Request request, ClusterStat
if (request.isForce()) {
forceDeleteDatafeed(request, state, listener);
} else {
deleteDatafeedConfig(request, listener);
datafeedManager.deleteDatafeed(request, state, listener);
}
}

private void forceDeleteDatafeed(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
ActionListener<Boolean> finalListener = ActionListener.wrap(
response -> deleteDatafeedConfig(request, listener),
// use clusterService.state() here so that the updated state without the task is available
response -> datafeedManager.deleteDatafeed(request, clusterService.state(), listener),
listener::onFailure
);

Expand Down Expand Up @@ -118,37 +113,6 @@ public void onFailure(Exception e) {
}
}

private void deleteDatafeedConfig(DeleteDatafeedAction.Request request, ActionListener<AcknowledgedResponse> listener) {
// Check datafeed is stopped
PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
if (MlTasks.getDatafeedTask(request.getDatafeedId(), tasks) != null) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
Messages.getMessage(Messages.DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE, request.getDatafeedId(), DatafeedState.STARTED)));
return;
}

String datafeedId = request.getDatafeedId();

datafeedConfigProvider.getDatafeedConfig(
datafeedId,
ActionListener.wrap(
datafeedConfigBuilder -> {
String jobId = datafeedConfigBuilder.build().getJobId();
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
jobDataDeleter.deleteDatafeedTimingStats(
ActionListener.wrap(
unused1 -> {
datafeedConfigProvider.deleteDatafeedConfig(
datafeedId,
ActionListener.wrap(
unused2 -> listener.onResponse(AcknowledgedResponse.TRUE),
listener::onFailure));
},
listener::onFailure));
},
listener::onFailure));
}

@Override
protected ClusterBlockException checkBlock(DeleteDatafeedAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -23,10 +24,9 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
Expand All @@ -46,9 +46,8 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

Expand All @@ -68,8 +67,8 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
private final Client client;
private final PersistentTasksService persistentTasksService;
private final AnomalyDetectionAuditor auditor;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;
private final JobManager jobManager;
private final DatafeedConfigProvider datafeedConfigProvider;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
Expand All @@ -86,20 +85,20 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
Client client, AnomalyDetectionAuditor auditor, JobResultsProvider jobResultsProvider,
Client client, AnomalyDetectionAuditor auditor,
JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
MlMemoryTracker memoryTracker) {
MlMemoryTracker memoryTracker, JobManager jobManager) {
super(DeleteJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteJobAction.Request::new, indexNameExpressionResolver, ThreadPool.Names.SAME);
this.client = client;
this.persistentTasksService = persistentTasksService;
this.auditor = auditor;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.listenersByJobId = new HashMap<>();
this.jobManager = jobManager;
}

@Override
Expand All @@ -121,7 +120,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
return;
}

logger.debug("Deleting job '{}'", request.getJobId());
logger.debug(() -> new ParameterizedMessage("[{}] deleting job ", request.getJobId()));

if (request.isForce() == false) {
checkJobIsNotOpen(request.getJobId(), state);
Expand All @@ -133,8 +132,11 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
// Check if there is a deletion task for this job already and if yes wait for it to complete
synchronized (listenersByJobId) {
if (listenersByJobId.containsKey(request.getJobId())) {
logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete",
request.getJobId(), task.getId());
logger.debug(() -> new ParameterizedMessage(
"[{}] Deletion task [{}] will wait for existing deletion task to complete",
request.getJobId(),
task.getId()
));
listenersByJobId.get(request.getJobId()).add(listener);
return;
} else {
Expand All @@ -158,9 +160,9 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
ActionListener<PutJobAction.Response> markAsDeletingListener = ActionListener.wrap(
response -> {
if (request.isForce()) {
forceDeleteJob(parentTaskClient, request, finalListener);
forceDeleteJob(parentTaskClient, request, state, finalListener);
} else {
normalDeleteJob(parentTaskClient, request, finalListener);
normalDeleteJob(parentTaskClient, request, state, finalListener);
}
},
finalListener::onFailure);
Expand All @@ -176,7 +178,7 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
logger.info(
"[{}] config is missing but task exists. Attempting to delete tasks and stop process",
request.getJobId());
forceDeleteJob(parentTaskClient, request, finalListener);
forceDeleteJob(parentTaskClient, request, state, finalListener);
} else {
finalListener.onFailure(e);
}
Expand Down Expand Up @@ -204,64 +206,41 @@ private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @
}
}

private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient,
DeleteJobAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) {
String jobId = request.getJobId();

// We clean up the memory tracker on delete rather than close as close is not a master node action
memoryTracker.removeAnomalyDetectorJob(jobId);

// Step 4. When the job has been removed from the cluster state, return a response
// -------
CheckedConsumer<Boolean, Exception> apiResponseHandler = jobDeleted -> {
if (jobDeleted) {
logger.info("Job [" + jobId + "] deleted");
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DELETED));
listener.onResponse(AcknowledgedResponse.TRUE);
} else {
listener.onResponse(AcknowledgedResponse.FALSE);
}
};

// Step 3. When the physical storage has been deleted, delete the job config document
// -------
// Don't report an error if the document has already been deleted
CheckedConsumer<Boolean, Exception> deleteJobStateHandler = response -> jobConfigProvider.deleteJob(jobId, false,
ActionListener.wrap(
deleteResponse -> apiResponseHandler.accept(Boolean.TRUE),
listener::onFailure
)
);

// Step 2. Remove the job from any calendars
CheckedConsumer<Boolean, Exception> removeFromCalendarsHandler = response -> jobResultsProvider.removeJobFromCalendars(jobId,
ActionListener.wrap(deleteJobStateHandler::accept, listener::onFailure));


// Step 1. Delete the physical storage
new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments(
jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
jobManager.deleteJob(request, parentTaskClient, state, listener);
}

private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
ActionListener<AcknowledgedResponse> listener) {

logger.debug("Force deleting job [{}]", request.getJobId());
private void forceDeleteJob(
ParentTaskAssigningClient parentTaskClient,
DeleteJobAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener
) {

final ClusterState state = clusterService.state();
final String jobId = request.getJobId();
logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));

// 3. Delete the job
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
normalDeleteJob(parentTaskClient, request, listener);
// use clusterService.state() here so that the updated state without the task is available
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
}

@Override
public void onFailure(Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
normalDeleteJob(parentTaskClient, request, listener);
// use clusterService.state() here so that the updated state without the task is available
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
} else {
listener.onFailure(e);
}
Expand All @@ -271,12 +250,12 @@ public void onFailure(Exception e) {
// 2. Cancel the persistent task. This closes the process gracefully so
// the process should be killed first.
ActionListener<KillProcessAction.Response> killJobListener = ActionListener.wrap(
response -> removePersistentTask(request.getJobId(), state, removeTaskListener),
response -> removePersistentTask(jobId, state, removeTaskListener),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof ElasticsearchStatusException) {
// Killing the process marks the task as completed so it
// may have disappeared when we get here
removePersistentTask(request.getJobId(), state, removeTaskListener);
removePersistentTask(jobId, state, removeTaskListener);
} else {
listener.onFailure(e);
}
Expand Down

0 comments on commit 1f78e9b

Please sign in to comment.