Skip to content

Commit

Permalink
Add delayed datacheck to the datafeed job runner (#35387)
Browse files Browse the repository at this point in the history
* ML: Adding missing datacheck to datafeedjob

* Adding client side and docs

* Making adjustments to validations

* Making values default to on, having more sensible limits

* Intermittent commit, still need to figure out interval

* Adjusting delayed data check interval

* updating docs

* Making parameter Boolean, so it is nullable

* bumping bwc to 7 before backport

* changing to version current

* moving delayed data check config its own object

* Separation of duties for delayed data detection

* fixing checkstyles

* fixing checkstyles

* Adjusting default behavior so that null windows are allowed

* Mentioning the default value

* Fixing comments, syncing up validations
  • Loading branch information
benwtrent committed Nov 15, 2018
1 parent 71a1066 commit 76067e4
Show file tree
Hide file tree
Showing 31 changed files with 1,136 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
Expand All @@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -107,10 +109,12 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;


private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -122,6 +126,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
}

public String getId() {
Expand Down Expand Up @@ -168,6 +173,10 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -204,6 +213,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
if (delayedDataCheckConfig != null) {
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -244,7 +256,8 @@ public boolean equals(Object other) {
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
}

/**
Expand All @@ -255,7 +268,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

public static Builder builder(String id, String jobId) {
Expand All @@ -275,6 +288,7 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
Expand All @@ -293,6 +307,7 @@ public Builder(DatafeedConfig config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -366,9 +381,23 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

/**
* This sets the {@link DelayedDataCheckConfig} settings.
*
* See {@link DelayedDataCheckConfig} for more information.
*
* @param delayedDataCheckConfig the delayed data check configuration
* Default value is enabled, with `check_window` being null. This means the true window is
* calculated when the real-time Datafeed runs.
*/
public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}

public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class DatafeedUpdate implements ToXContentObject {
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -96,10 +99,11 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;

private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -111,6 +115,7 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
}

/**
Expand Down Expand Up @@ -146,6 +151,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (delayedDataCheckConfig != null) {
builder.field(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
Expand Down Expand Up @@ -198,6 +206,10 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

public DelayedDataCheckConfig getDelayedDataCheckConfig() {
return delayedDataCheckConfig;
}

private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
Expand Down Expand Up @@ -232,6 +244,7 @@ public boolean equals(Object other) {
&& Objects.equals(asMap(this.query), asMap(that.query))
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
Expand All @@ -244,7 +257,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

public static Builder builder(String id) {
Expand All @@ -264,6 +277,7 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;

public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
Expand All @@ -281,6 +295,7 @@ public Builder(DatafeedUpdate config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
}

public Builder setJobId(String jobId) {
Expand Down Expand Up @@ -359,9 +374,14 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckConfig);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* The configuration object containing the delayed data check settings.
*
* See {@link DelayedDataCheckConfig#enabledDelayedDataCheckConfig(TimeValue)} for creating a new
* enabled datacheck with the given check_window
*
* See {@link DelayedDataCheckConfig#disabledDelayedDataCheckConfig()} for creating a config for disabling
* delayed data checking.
*/
public class DelayedDataCheckConfig implements ToXContentObject {

public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField CHECK_WINDOW = new ParseField("check_window");

// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ConstructingObjectParser<DelayedDataCheckConfig, Void> PARSER = new ConstructingObjectParser<>(
"delayed_data_check_config", true, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1]));
static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, CHECK_WINDOW, ObjectParser.ValueType.STRING);
}

/**
* This creates a new DelayedDataCheckConfig that has a check_window of the passed `timeValue`
*
* We query the index to the latest finalized bucket from this TimeValue in the past looking to see if any data has been indexed
* since the data was read with the Datafeed.
*
* The window must be larger than the {@link org.elasticsearch.client.ml.job.config.AnalysisConfig#bucketSpan}, less than
* 24 hours, and span less than 10,000x buckets.
*
*
* @param timeValue The time length in the past from the latest finalized bucket to look for latent data.
* If `null` is provided, the appropriate window is calculated when it is used
**/
public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) {
return new DelayedDataCheckConfig(true, timeValue);
}

/**
* This creates a new DelayedDataCheckConfig that disables the data check.
*/
public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() {
return new DelayedDataCheckConfig(false, null);
}

private final boolean enabled;
private final TimeValue checkWindow;

DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) {
this.enabled = enabled;
this.checkWindow = checkWindow;
}

public boolean isEnabled() {
return enabled;
}

@Nullable
public TimeValue getCheckWindow() {
return checkWindow;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ENABLED.getPreferredName(), enabled);
if (checkWindow != null) {
builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep());
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(enabled, checkWindow);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}

DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj;
return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.client.ml.datafeed.DatafeedStats;
import org.elasticsearch.client.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.client.ml.job.config.AnalysisConfig;
import org.elasticsearch.client.ml.job.config.AnalysisLimits;
import org.elasticsearch.client.ml.job.config.DataDescription;
Expand Down Expand Up @@ -583,6 +584,14 @@ public void testPutDatafeed() throws Exception {
datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1>
// end::put-datafeed-config-set-query-delay

// tag::put-datafeed-config-set-delayed-data-check-config
datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig
.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1))); // <1>
// end::put-datafeed-config-set-delayed-data-check-config

// no need to accidentally trip internal validations due to job bucket size
datafeedBuilder.setDelayedDataCheckConfig(null);

List<SearchSourceBuilder.ScriptField> scriptFields = Collections.emptyList();
// tag::put-datafeed-config-set-script-fields
datafeedBuilder.setScriptFields(scriptFields); // <1>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public static DatafeedConfig.Builder createRandomBuilder() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static DatafeedUpdate createRandom() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
return builder.build();
}

Expand Down

0 comments on commit 76067e4

Please sign in to comment.