Skip to content

Commit

Permalink
[ML][HLRC] Adds support for reset job api (#74254)
Browse files Browse the repository at this point in the history
Adds HLRC support for the newly added reset anomaly
detection job API.
  • Loading branch information
dimitris-athanasiou committed Jun 18, 2021
1 parent 7a0ae30 commit ae7d9df
Show file tree
Hide file tree
Showing 16 changed files with 635 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.PutTrainedModelAliasRequest;
import org.elasticsearch.client.ml.PutTrainedModelRequest;
import org.elasticsearch.client.ml.ResetJobRequest;
import org.elasticsearch.client.ml.RevertModelSnapshotRequest;
import org.elasticsearch.client.ml.SetUpgradeModeRequest;
import org.elasticsearch.client.ml.StartDataFrameAnalyticsRequest;
Expand Down Expand Up @@ -191,6 +192,23 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {
return request;
}

static Request resetJob(ResetJobRequest resetJobRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(resetJobRequest.getJobId())
.addPathPartAsIs("_reset")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
if (resetJobRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", Boolean.toString(resetJobRequest.getWaitForCompletion()));
}
request.addParameters(params.asMap());
return request;
}

static Request flushJob(FlushJobRequest flushJobRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
import org.elasticsearch.client.ml.DeleteTrainedModelAliasRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.EstimateModelMemoryRequest;
import org.elasticsearch.client.ml.EstimateModelMemoryResponse;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
import org.elasticsearch.client.ml.DeleteCalendarRequest;
Expand All @@ -29,8 +23,14 @@
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelAliasRequest;
import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
import org.elasticsearch.client.ml.EstimateModelMemoryRequest;
import org.elasticsearch.client.ml.EstimateModelMemoryResponse;
import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
import org.elasticsearch.client.ml.EvaluateDataFrameResponse;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
Expand Down Expand Up @@ -93,6 +93,8 @@
import org.elasticsearch.client.ml.PutTrainedModelAliasRequest;
import org.elasticsearch.client.ml.PutTrainedModelRequest;
import org.elasticsearch.client.ml.PutTrainedModelResponse;
import org.elasticsearch.client.ml.ResetJobRequest;
import org.elasticsearch.client.ml.ResetJobResponse;
import org.elasticsearch.client.ml.RevertModelSnapshotRequest;
import org.elasticsearch.client.ml.RevertModelSnapshotResponse;
import org.elasticsearch.client.ml.SetUpgradeModeRequest;
Expand Down Expand Up @@ -611,6 +613,46 @@ public RevertModelSnapshotResponse revertModelSnapshot(RevertModelSnapshotReques
Collections.emptySet());
}

/**
* Resets the given Machine Learning Job
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html">ML Reset job documentation</a>
*
* @param request The request to reset the job
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
* completion
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public ResetJobResponse resetJob(ResetJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::resetJob,
options,
ResetJobResponse::fromXContent,
Collections.emptySet());
}

/**
* Resets the given Machine Learning Job asynchronously and notifies the listener on completion
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html">ML Reset Job documentation</a>
*
* @param request The request to reset the job
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable resetJobAsync(ResetJobRequest request, RequestOptions options, ActionListener<ResetJobResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::resetJob,
options,
ResetJobResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Reverts to a particular Machine Learning Model Snapshot asynchronously and notifies the listener on completion
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;

import java.util.Objects;

/**
* Request to delete a Machine Learning Job via its ID
*/
public class ResetJobRequest extends ActionRequest {

private String jobId;
private Boolean waitForCompletion;

public ResetJobRequest(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
}

public String getJobId() {
return jobId;
}

/**
* The jobId which to reset
* @param jobId unique jobId to reset, must not be null
*/
public void setJobId(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

/**
* Set whether this request should wait until the operation has completed before returning
* @param waitForCompletion When {@code true} the call will wait for the job reset to complete.
* Otherwise, the reset will be executed asynchronously and the response
* will contain the task id.
*/
public void setWaitForCompletion(Boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public int hashCode() {
return Objects.hash(jobId, waitForCompletion);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || obj.getClass() != getClass()) {
return false;
}

ResetJobRequest other = (ResetJobRequest) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(waitForCompletion, other.waitForCompletion);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Objects;

/**
* Response object that contains the acknowledgement or the task id
* depending on whether the reset job action was requested to wait for completion.
*/
public class ResetJobResponse implements ToXContentObject {

private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
private static final ParseField TASK = new ParseField("task");

public static final ConstructingObjectParser<ResetJobResponse, Void> PARSER = new ConstructingObjectParser<>("reset_job_response",
true, a-> new ResetJobResponse((Boolean) a[0], (TaskId) a[1]));

static {
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
}

public static ResetJobResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private final Boolean acknowledged;
private final TaskId task;

ResetJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
assert acknowledged != null || task != null;
this.acknowledged = acknowledged;
this.task = task;
}

/**
* Get the action acknowledgement
* @return {@code null} when the request had {@link ResetJobRequest#getWaitForCompletion()} set to {@code false} or
* otherwise a {@code boolean} that indicates whether the job was reset successfully.
*/
public Boolean getAcknowledged() {
return acknowledged;
}

/**
* Get the task id
* @return {@code null} when the request had {@link ResetJobRequest#getWaitForCompletion()} set to {@code true} or
* otherwise the id of the job reset task.
*/
public TaskId getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(acknowledged, task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

ResetJobResponse that = (ResetJobResponse) other;
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (acknowledged != null) {
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
}
if (task != null) {
builder.field(TASK.getPreferredName(), task.toString());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client.ml.job.config;

import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

public class Blocked implements ToXContentObject {

public enum Reason {
NONE, DELETE, RESET, REVERT;

public static Reason fromString(String value) {
return Reason.valueOf(value.toUpperCase(Locale.ROOT));
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

public static final ParseField REASON = new ParseField("reason");
public static final ParseField TASK_ID = new ParseField("task_id");

public static final ConstructingObjectParser<Blocked, Void> PARSER = new ConstructingObjectParser<>("blocked", true,
a -> new Blocked((Reason) a[0], (TaskId) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Reason::fromString, REASON);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TaskId::new, TASK_ID);
}

private final Reason reason;

@Nullable
private final TaskId taskId;

public Blocked(Reason reason, @Nullable TaskId taskId) {
this.reason = Objects.requireNonNull(reason);
this.taskId = taskId;
}

public Reason getReason() {
return reason;
}

@Nullable
public TaskId getTaskId() {
return taskId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REASON.getPreferredName(), reason);
if (taskId != null) {
builder.field(TASK_ID.getPreferredName(), taskId.toString());
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(reason, taskId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Blocked that = (Blocked) o;
return Objects.equals(reason, that.reason) && Objects.equals(taskId, that.taskId);
}
}

0 comments on commit ae7d9df

Please sign in to comment.