Skip to content

Commit

Permalink
[ML] Ensure mappings are up to date before reverting state (#68767)
Browse files Browse the repository at this point in the history
When a model snapshot is reverted the job document in the
config index needs to be updated. Before this is done it
is essential to ensure the config index mappings are up
to date, otherwise we will try to create dynamic mappings
for fields newly added to the job config.

Backport of #68746
  • Loading branch information
droberts195 committed Feb 10, 2021
1 parent 65a0e35 commit c1d43fa
Showing 1 changed file with 23 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -71,30 +73,31 @@ public TransportRevertModelSnapshotAction(Settings settings, ThreadPool threadPo
@Override
protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state,
ActionListener<RevertModelSnapshotAction.Response> listener) {
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId()));
final String jobId = request.getJobId();

if (migrationEligibilityCheck.jobIsEligibleForMigration(jobId, state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", jobId));
return;
}

logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());

request.getSnapshotId(), jobId, request.getDeleteInterveningResults());

// 3. Revert the state
ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
exists -> {
// 4. Revert the state
ActionListener<Boolean> configMappingUpdateListener = ActionListener.wrap(
r -> {
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);
JobState jobState = MlTasks.getJobState(jobId, tasks);

if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)));
return;
}

if (MlTasks.getSnapshotUpgraderTask(request.getJobId(), request.getSnapshotId(), tasks) != null) {
if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) {
listener.onFailure(ExceptionsHelper.conflictStatusException(
"Cannot revert job [{}] to snapshot [{}] as it is being upgraded",
request.getJobId(),
jobId,
request.getSnapshotId()
));
return;
Expand All @@ -103,20 +106,26 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste
getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId);
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId);
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId);
}
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
},
listener::onFailure
);

// 3. Ensure the config index mappings are up to date
ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
r -> ElasticsearchMappings.addDocMappingIfMissing(MlConfigIndex.indexName(), MlConfigIndex::mapping,
client, state, configMappingUpdateListener),
listener::onFailure
);

// 2. Verify the job exists
ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
r -> jobManager.jobExists(jobId, jobExistsListener),
listener::onFailure
);

Expand Down

0 comments on commit c1d43fa

Please sign in to comment.