Skip to content

Commit

Permalink
[7.17] [ML] Improve cleanup for model snapshot upgrades (#82022)
Browse files Browse the repository at this point in the history
* [7.17] [ML] Improve cleanup for model snapshot upgrades

If a model snapshot upgrade persistent task is cancelled then
we now kill any associated C++ process. Previously the C++ process
could hang indefinitely.

Additionally, ML feature reset now cancels any in-progress model
snapshot upgrades before cleaning up job data, and deleting an
anomaly detection job cancels any in-progress model snapshot
upgrades associated with that job before cleaning up the job's
data.

Backport of #81831

* Formatting
  • Loading branch information
droberts195 committed Dec 22, 2021
1 parent 3cdae7e commit 48a1fba
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.regex.Regex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -175,6 +176,47 @@ public boolean isOnlyExact() {
return onlyExact;
}

/**
* A simple matcher with one purpose to test whether an id
* matches a expression that may contain wildcards.
* Use the {@link #idMatches(String)} function to
* test if the given id is matched by any of the matchers.
*
* Unlike {@link ExpandedIdsMatcher} there is no
* allowNoMatchForWildcards logic and the matchers
* are not be removed once they have been matched.
*/
public static class SimpleIdsMatcher {

private final List<IdMatcher> matchers;

public SimpleIdsMatcher(String[] tokens) {

if (Strings.isAllOrWildcard(tokens)) {
matchers = Collections.singletonList(new WildcardMatcher("*"));
return;
}

matchers = Arrays.stream(tokens)
.map(token -> Regex.isSimpleMatchPattern(token) ? new WildcardMatcher(token) : new EqualsIdMatcher(token))
.collect(Collectors.toList());
}

public SimpleIdsMatcher(String expression) {
this(tokenizeExpression(expression));
}

/**
* Do any of the matchers match {@code id}?
*
* @param id Id to test
* @return True if the given id is matched by any of the matchers
*/
public boolean idMatches(String id) {
return matchers.stream().anyMatch(idMatcher -> idMatcher.matches(id));
}
}

private abstract static class IdMatcher {
protected final String id;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Objects;

public class CancelJobModelSnapshotUpgradeAction extends ActionType<CancelJobModelSnapshotUpgradeAction.Response> {

public static final CancelJobModelSnapshotUpgradeAction INSTANCE = new CancelJobModelSnapshotUpgradeAction();

// Even though at the time of writing this action doesn't have a REST endpoint the action name is
// still "admin" rather than "internal". This is because there's no conceptual reason why this
// action couldn't have a REST endpoint in the future, and it's painful to change these action
// names after release. The only difference is that in 7.17 the last remaining transport client
// users will be able to call this endpoint. In 8.x there is no transport client, so in 8.x there
// is no difference between having "admin" and "internal" here in the period before a REST endpoint
// exists. Using "admin" just makes life easier if we ever decide to add a REST endpoint in the
// future.
public static final String NAME = "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel";

private CancelJobModelSnapshotUpgradeAction() {
super(NAME, Response::new);
}

public static class Request extends ActionRequest implements ToXContentObject {

public static final String ALL = "_all";

public static final ParseField SNAPSHOT_ID = new ParseField("snapshot_id");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");

static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);

static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareString(Request::setSnapshotId, SNAPSHOT_ID);
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
}

private String jobId = ALL;
private String snapshotId = ALL;
private boolean allowNoMatch = true;

public Request() {}

public Request(String jobId, String snapshotId) {
setJobId(jobId);
setSnapshotId(snapshotId);
}

public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
snapshotId = in.readString();
allowNoMatch = in.readBoolean();
}

public final Request setJobId(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
return this;
}

public String getJobId() {
return jobId;
}

public final Request setSnapshotId(String snapshotId) {
this.snapshotId = ExceptionsHelper.requireNonNull(snapshotId, Job.ID);
return this;
}

public String getSnapshotId() {
return snapshotId;
}

public boolean allowNoMatch() {
return allowNoMatch;
}

public Request setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeString(snapshotId);
out.writeBoolean(allowNoMatch);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(Job.ID.getPreferredName(), jobId)
.field(SNAPSHOT_ID.getPreferredName(), snapshotId)
.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch)
.endObject();
}

@Override
public int hashCode() {
return Objects.hash(jobId, snapshotId, allowNoMatch);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(snapshotId, other.snapshotId) && allowNoMatch == other.allowNoMatch;
}

@Override
public String toString() {
return Strings.toString(this);
}
}

public static class Response extends ActionResponse implements Writeable, ToXContentObject {

private final boolean cancelled;

public Response(boolean cancelled) {
this.cancelled = cancelled;
}

public Response(StreamInput in) throws IOException {
cancelled = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(cancelled);
}

public boolean isCancelled() {
return cancelled;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("cancelled", cancelled);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return cancelled == response.cancelled;
}

@Override
public int hashCode() {
return Objects.hash(cancelled);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Request;

public class CancelJobModelSnapshotUpgradeActionRequestTests extends AbstractSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
if (randomBoolean()) {
request.setAllowNoMatch(randomBoolean());
}
return request;
}

@Override
protected boolean supportsUnknownFields() {
return false;
}

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request doParseInstance(XContentParser parser) {
return Request.PARSER.apply(parser, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Response;

public class CancelJobModelSnapshotUpgradeActionResponseTests extends AbstractWireSerializingTestCase<Response> {

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Writeable.Reader<Response> instanceReader() {
return CancelJobModelSnapshotUpgradeAction.Response::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
Expand Down Expand Up @@ -164,6 +165,7 @@
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.ml.action.TransportCancelJobModelSnapshotUpgradeAction;
import org.elasticsearch.xpack.ml.action.TransportCloseJobAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarEventAction;
Expand Down Expand Up @@ -1250,6 +1252,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetTrainedModelsStatsAction.INSTANCE, TransportGetTrainedModelsStatsAction.class),
new ActionHandler<>(PutTrainedModelAction.INSTANCE, TransportPutTrainedModelAction.class),
new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class),
new ActionHandler<>(CancelJobModelSnapshotUpgradeAction.INSTANCE, TransportCancelJobModelSnapshotUpgradeAction.class),
new ActionHandler<>(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, TransportGetJobModelSnapshotsUpgradeStatsAction.class),
new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class),
new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class),
Expand Down Expand Up @@ -1668,16 +1671,28 @@ public void cleanUpFeature(
}, unsetResetModeListener::onFailure);

// Stop data feeds
ActionListener<CancelJobModelSnapshotUpgradeAction.Response> cancelSnapshotUpgradesListener = ActionListener.wrap(
cancelUpgradesResponse -> {
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
client.execute(
StopDatafeedAction.INSTANCE,
stopDatafeedsReq,
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
})
);
},
unsetResetModeListener::onFailure
);

// Cancel model snapshot upgrades
ActionListener<AcknowledgedResponse> pipelineValidation = ActionListener.wrap(acknowledgedResponse -> {
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
client.execute(
StopDatafeedAction.INSTANCE,
stopDatafeedsReq,
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
})
CancelJobModelSnapshotUpgradeAction.Request cancelSnapshotUpgradesReq = new CancelJobModelSnapshotUpgradeAction.Request(
"_all",
"_all"
);
client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, cancelSnapshotUpgradesListener);
}, unsetResetModeListener::onFailure);

// validate no pipelines are using machine learning models
Expand Down

0 comments on commit 48a1fba

Please sign in to comment.