Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] adding datafeed_config to job in high level rest client #75338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -43,6 +42,7 @@
public class DatafeedConfig implements ToXContentObject {

public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField JOB_ID = new ParseField("job_id");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField INDEXES = new ParseField("indexes");
Expand All @@ -61,7 +61,7 @@ public class DatafeedConfig implements ToXContentObject {

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);

PARSER.declareStringArray(Builder::setIndices, INDEXES);
PARSER.declareStringArray(Builder::setIndices, INDICES);
Expand Down Expand Up @@ -189,7 +189,7 @@ public Map<String, Object> getRuntimeMappings() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(JOB_ID.getPreferredName(), jobId);
if (queryDelay != null) {
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
Expand Down Expand Up @@ -312,7 +312,7 @@ public static class Builder {

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
this.jobId = Objects.requireNonNull(jobId, Job.ID.getPreferredName());
this.jobId = Objects.requireNonNull(jobId, JOB_ID.getPreferredName());
}

public Builder(DatafeedConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.client.ml.job.config;

import org.elasticsearch.client.common.TimeUtil;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -24,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* This class represents a configured and created Job. The creation time is set
Expand Down Expand Up @@ -60,6 +62,7 @@ public class Job implements ToXContentObject {
public static final ParseField DELETING = new ParseField("deleting");
public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open");
public static final ParseField BLOCKED = new ParseField("blocked");
public static final ParseField DATAFEED_CONFIG = new ParseField("datafeed_config");

public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);

Expand Down Expand Up @@ -92,6 +95,7 @@ public class Job implements ToXContentObject {
PARSER.declareBoolean(Builder::setDeleting, DELETING);
PARSER.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN);
PARSER.declareObject(Builder::setBlocked, Blocked.PARSER, BLOCKED);
PARSER.declareObject(Builder::setDatafeed, DatafeedConfig.PARSER, DATAFEED_CONFIG);
}

private final String jobId;
Expand All @@ -116,14 +120,15 @@ public class Job implements ToXContentObject {
private final Boolean deleting;
private final Boolean allowLazyOpen;
private final Blocked blocked;
private final DatafeedConfig datafeedConfig;

private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long dailyModelSnapshotRetentionAfterDays, Long resultsRetentionDays,
Map<String, Object> customSettings, String modelSnapshotId, String resultsIndexName, Boolean deleting,
Boolean allowLazyOpen, Blocked blocked) {
Boolean allowLazyOpen, Blocked blocked, DatafeedConfig datafeedConfig) {

this.jobId = jobId;
this.jobType = jobType;
Expand All @@ -146,6 +151,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.deleting = deleting;
this.allowLazyOpen = allowLazyOpen;
this.blocked = blocked;
this.datafeedConfig = datafeedConfig;
}

/**
Expand Down Expand Up @@ -286,6 +292,14 @@ public Blocked getBlocked() {
return blocked;
}

/**
* The currently configured datafeed for the job
* @return Optional of the datafeed config. Will be none if a datafeed is not configured for this job
*/
public Optional<DatafeedConfig> getDatafeedConfig() {
return Optional.ofNullable(datafeedConfig);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -350,6 +364,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (blocked != null) {
builder.field(BLOCKED.getPreferredName(), blocked);
}
if (datafeedConfig != null) {
builder.field(DATAFEED_CONFIG.getPreferredName(), datafeedConfig, params);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -385,15 +402,16 @@ public boolean equals(Object other) {
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting)
&& Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
&& Objects.equals(this.blocked, that.blocked);
&& Objects.equals(this.blocked, that.blocked)
&& Objects.equals(this.datafeedConfig, that.datafeedConfig);
}

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
customSettings, modelSnapshotId, resultsIndexName, deleting, allowLazyOpen, blocked);
customSettings, modelSnapshotId, resultsIndexName, deleting, allowLazyOpen, blocked, datafeedConfig);
}

@Override
Expand Down Expand Up @@ -428,6 +446,7 @@ public static class Builder {
private Boolean deleting;
private Boolean allowLazyOpen;
private Blocked blocked;
private DatafeedConfig.Builder datafeedConfig;

private Builder() {
}
Expand Down Expand Up @@ -458,6 +477,7 @@ public Builder(Job job) {
this.deleting = job.getDeleting();
this.allowLazyOpen = job.getAllowLazyOpen();
this.blocked = job.getBlocked();
this.datafeedConfig = job.getDatafeedConfig().isPresent() ? new DatafeedConfig.Builder(job.datafeedConfig) : null;
}

public Builder setId(String id) {
Expand Down Expand Up @@ -569,6 +589,11 @@ Builder setBlocked(Blocked blocked) {
return this;
}

public Builder setDatafeed(DatafeedConfig.Builder datafeed) {
this.datafeedConfig = datafeed;
return this;
}

/**
* Builds a job.
*
Expand All @@ -581,7 +606,8 @@ public Job build() {
id, jobType, groups, description, createTime, finishedTime,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
customSettings, modelSnapshotId, resultsIndexName, deleting, allowLazyOpen, blocked);
customSettings, modelSnapshotId, resultsIndexName, deleting, allowLazyOpen, blocked,
datafeedConfig == null ? null : datafeedConfig.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,38 @@ public void testGetJob() throws Exception {
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
}

public void testGetJobWithDatafeed() throws Exception {
String jobId = "hlrc-job-with-datafeed";

Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

String datafeedId = "datafeed-" + jobId;
DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();

execute(new PutDatafeedRequest(datafeedConfig), machineLearningClient::putDatafeed, machineLearningClient::putDatafeedAsync);

// Test getting specific job
GetJobResponse response = execute(new GetJobRequest(jobId), machineLearningClient::getJob, machineLearningClient::getJobAsync);
assertThat(response.jobs(), hasSize(1));
assertThat(response.jobs().get(0).getDatafeedConfig().orElse(null), is(notNullValue()));
}

public void testPutJobWithDatafeed() throws Exception {
String jobId = "hlrc-put-job-with-datafeed";

Job.Builder job = buildJobBuilder(jobId).setDatafeed(DatafeedConfig.builder(jobId, jobId).setIndices("some_data_index"));

MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job.build()), RequestOptions.DEFAULT);

// Test getting specific job
GetJobResponse response = execute(new GetJobRequest(jobId), machineLearningClient::getJob, machineLearningClient::getJobAsync);
assertThat(response.jobs(), hasSize(1));
assertThat(response.jobs().get(0).getDatafeedConfig().orElse(null), is(notNullValue()));
}

public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
Expand Down Expand Up @@ -2810,6 +2842,10 @@ private static Job buildJobForExpiredDataTests(String jobId) {
}

public static Job buildJob(String jobId) {
return buildJobBuilder(jobId).build();
}

public static Job.Builder buildJobBuilder(String jobId) {
Job.Builder builder = new Job.Builder(jobId);
builder.setDescription(randomAlphaOfLength(10));

Expand All @@ -2828,8 +2864,7 @@ public static Job buildJob(String jobId) {
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
dataDescription.setTimeField("timestamp");
builder.setDataDescription(dataDescription);

return builder.build();
return builder;
}

private void putJob(Job job) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public static DatafeedConfig createRandom() {
return createRandomBuilder().build();
}

public static DatafeedConfig.Builder createRandomBuilder() {
public static DatafeedConfig.Builder createRandomBuilder(String datafeedId, String jobId) {
long bucketSpanMillis = 3600000;
DatafeedConfig.Builder builder = constructBuilder();
DatafeedConfig.Builder builder = constructBuilder(datafeedId, jobId);
builder.setIndices(randomStringList(1, 10));
if (randomBoolean()) {
try {
Expand Down Expand Up @@ -119,6 +119,10 @@ public static DatafeedConfig.Builder createRandomBuilder() {
return builder;
}

public static DatafeedConfig.Builder createRandomBuilder() {
return createRandomBuilder(randomValidDatafeedId(), randomAlphaOfLength(10));
}

public static List<String> randomStringList(int min, int max) {
int size = scaledRandomIntBetween(min, max);
List<String> list = new ArrayList<>();
Expand Down Expand Up @@ -175,8 +179,8 @@ public static String randomValidDatafeedId() {
return generator.ofCodePointsLength(random(), 10, 10);
}

private static DatafeedConfig.Builder constructBuilder() {
return new DatafeedConfig.Builder(randomValidDatafeedId(), randomAlphaOfLength(10));
private static DatafeedConfig.Builder constructBuilder(String datafeedId, String jobId) {
return new DatafeedConfig.Builder(datafeedId, jobId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.client.ml.job.config;

import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;

import org.elasticsearch.client.ml.datafeed.DatafeedConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
Expand Down Expand Up @@ -163,6 +165,9 @@ public static Job.Builder createRandomizedJobBuilder() {
if (randomBoolean()) {
builder.setBlocked(BlockedTests.createRandom());
}
if (randomBoolean()) {
builder.setDatafeed(DatafeedConfigTests.createRandomBuilder(jobId, jobId));
}
return builder;
}

Expand Down