Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delayed datacheck to the datafeed job runner #35387

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2066230
ML: Adding missing datacheck to datafeedjob
benwtrent Nov 7, 2018
a74221c
Adding client side and docs
benwtrent Nov 7, 2018
bdf37b1
Making adjustments to validations
benwtrent Nov 8, 2018
a74421c
Making values default to on, having more sensible limits
benwtrent Nov 8, 2018
c075d26
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 8, 2018
37c7319
Intermittent commit, still need to figure out interval
benwtrent Nov 8, 2018
7f30afe
Adjusting delayed data check interval
benwtrent Nov 9, 2018
b462a72
updating docs
benwtrent Nov 9, 2018
de21564
Making parameter Boolean, so it is nullable
benwtrent Nov 9, 2018
f678265
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 9, 2018
faa5fe1
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 12, 2018
352a03f
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 12, 2018
4e00aba
bumping bwc to 7 before backport
benwtrent Nov 13, 2018
f42f6a4
changing to version current
benwtrent Nov 13, 2018
8c87189
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 13, 2018
061f9a3
moving delayed data check config its own object
benwtrent Nov 13, 2018
183f7e7
Separation of duties for delayed data detection
benwtrent Nov 14, 2018
9655add
fixing checkstyles
benwtrent Nov 14, 2018
2becd81
fixing checkstyles
benwtrent Nov 14, 2018
47bbedb
Adjusting default behavior so that null windows are allowed
benwtrent Nov 14, 2018
1fc77a9
Mentioning the default value
benwtrent Nov 14, 2018
49de320
Fixing comments, syncing up validations
benwtrent Nov 15, 2018
55d0980
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
Expand All @@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -107,10 +109,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;


private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -122,6 +126,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
}

public String getId() {
Expand Down Expand Up @@ -168,6 +173,10 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -204,6 +213,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
if (delayedDataCheckConfig != null) {
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -244,7 +256,8 @@ public boolean equals(Object other) {
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
}

/**
Expand All @@ -255,7 +268,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

public static Builder builder(String id, String jobId) {
Expand All @@ -275,6 +288,7 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
Expand All @@ -293,6 +307,7 @@ public Builder(DatafeedConfig config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -366,9 +381,21 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

/**
* This sets the {@link DelayedDataCheckConfig} settings.
*
* See {@link DelayedDataCheckConfig} for more information.
*
* @param delayedDataCheckConfig the delayed data check configuration
*/
public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}

public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class DatafeedUpdate implements ToXContentObject {
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -96,10 +99,11 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;

private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -111,6 +115,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
}

/**
Expand Down Expand Up @@ -146,6 +151,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (delayedDataCheckConfig != null) {
builder.field(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
Expand Down Expand Up @@ -198,6 +206,10 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
Expand Down Expand Up @@ -232,6 +244,7 @@ public boolean equals(Object other) {
&& Objects.equals(asMap(this.query), asMap(that.query))
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
Expand All @@ -244,7 +257,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

public static Builder builder(String id) {
Expand All @@ -264,6 +277,7 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;

public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
Expand All @@ -281,6 +295,7 @@ public Builder(DatafeedUpdate config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
}

public Builder setJobId(String jobId) {
Expand Down Expand Up @@ -359,9 +374,14 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* The configuration object containing the delayed data check settings.
*
* See {@link DelayedDataCheckConfig#enabledDelayedDataCheckConfig(TimeValue)} for creating a new
* enabled datacheck with the given check_window
*
* See {@link DelayedDataCheckConfig#disabledDelayedDataCheckConfig()} for creating a config for disabling
* delayed data checking.
*/
public class DelayedDataCheckConfig implements ToXContentObject {

public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField CHECK_WINDOW = new ParseField("check_window");

// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ConstructingObjectParser<DelayedDataCheckConfig, Void> PARSER = new ConstructingObjectParser<>(
"delayed_data_check_config", true, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1]));
static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CHECK_WINDOW, ObjectParser.ValueType.STRING);
}

/**
* This creates a new DelayedDataCheckConfig that has a check_window of the passed `timeValue`
*
* We query the index to the latest finalized bucket from this TimeValue in the past looking to see if any data has been indexed
* since the data was read with the Datafeed.
*
* The window must be larger than the {@link org.elasticsearch.client.ml.job.config.AnalysisConfig#bucketSpan}, less than
* 24 hours, and span less than 10,000x buckets.
*
* @param timeValue The time length in the past from the latest finalized bucket to look for latent data.
* The default value is 2 hours
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
**/
public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) {
return new DelayedDataCheckConfig(true, timeValue);
}

/**
* This creates a new DelayedDataCheckConfig that disables the data check.
*/
public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() {
return new DelayedDataCheckConfig(false, null);
}

private final boolean enabled;
private final TimeValue checkWindow;

DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) {
this.enabled = enabled;
if (enabled) {
Objects.requireNonNull(checkWindow, "when delayed_data_check_config is enabled, check_window must not be null");
}
this.checkWindow = checkWindow;
}

public boolean isEnabled() {
return enabled;
}

@Nullable
public TimeValue getCheckWindow() {
return checkWindow;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ENABLED.getPreferredName(), enabled);
if (checkWindow != null) {
builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep());
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(enabled, checkWindow);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj;
return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedStats;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.AnalysisLimits;
import org.elasticsearch.client.ml.job.config.DataDescription;
Expand Down Expand Up @@ -576,6 +577,14 @@ public void testPutDatafeed() throws Exception {
datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1>
// end::put-datafeed-config-set-query-delay

// tag::put-datafeed-config-set-delayed-data-check-config
datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig
.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1))); // <1>
// end::put-datafeed-config-set-delayed-data-check-config

// no need to accidentally trip internal validations due to job bucket size
datafeedBuilder.setDelayedDataCheckConfig(null);

List<SearchSourceBuilder.ScriptField> scriptFields = Collections.emptyList();
// tag::put-datafeed-config-set-script-fields
datafeedBuilder.setScriptFields(scriptFields); // <1>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public static DatafeedConfig.Builder createRandomBuilder() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
if (randomBoolean()) {
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static DatafeedUpdate createRandom() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
return builder.build();
}

Expand Down