Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delayed datacheck to the datafeed job runner #35387

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2066230
ML: Adding missing datacheck to datafeedjob
benwtrent Nov 7, 2018
a74221c
Adding client side and docs
benwtrent Nov 7, 2018
bdf37b1
Making adjustments to validations
benwtrent Nov 8, 2018
a74421c
Making values default to on, having more sensible limits
benwtrent Nov 8, 2018
c075d26
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 8, 2018
37c7319
Intermittent commit, still need to figure out interval
benwtrent Nov 8, 2018
7f30afe
Adjusting delayed data check interval
benwtrent Nov 9, 2018
b462a72
updating docs
benwtrent Nov 9, 2018
de21564
Making parameter Boolean, so it is nullable
benwtrent Nov 9, 2018
f678265
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 9, 2018
faa5fe1
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 12, 2018
352a03f
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 12, 2018
4e00aba
bumping bwc to 7 before backport
benwtrent Nov 13, 2018
f42f6a4
changing to version current
benwtrent Nov 13, 2018
8c87189
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 13, 2018
061f9a3
moving delayed data check config its own object
benwtrent Nov 13, 2018
183f7e7
Separation of duties for delayed data detection
benwtrent Nov 14, 2018
9655add
fixing checkstyles
benwtrent Nov 14, 2018
2becd81
fixing checkstyles
benwtrent Nov 14, 2018
47bbedb
Adjusting default behavior so that null windows are allowed
benwtrent Nov 14, 2018
1fc77a9
Mentioning the default value
benwtrent Nov 14, 2018
49de320
Fixing comments, syncing up validations
benwtrent Nov 15, 2018
55d0980
Merge branch 'master' into feature/ml-datafeed-job-check-missing-data
benwtrent Nov 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_WINDOW = new ParseField("delayed_data_check_window");
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
public static final ParseField SHOULD_RUN_DELAYED_DATA_CHECK = new ParseField("should_run_delayed_data_check");

public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
Expand All @@ -88,6 +90,9 @@ public class DatafeedConfig implements ToXContentObject {
}, SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareString((builder, val) -> builder.setDelayedDataCheckWindow(
TimeValue.parseTimeValue(val, DELAYED_DATA_CHECK_WINDOW.getPreferredName())), DELAYED_DATA_CHECK_WINDOW);
PARSER.declareBoolean(Builder::setShouldRunDelayedDataCheck, SHOULD_RUN_DELAYED_DATA_CHECK);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -108,9 +113,16 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;

/**
* The window of time to check for missing data
*/
private final TimeValue delayedDataCheckWindow;
private final Boolean shouldRunDelayedDataCheck;

private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, TimeValue delayedDataCheckWindow,
Boolean shouldRunDelayedDataCheck) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -122,6 +134,8 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields);
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckWindow = delayedDataCheckWindow;
this.shouldRunDelayedDataCheck = shouldRunDelayedDataCheck;
}

public String getId() {
Expand Down Expand Up @@ -168,6 +182,21 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

/**
* The window of time in which to check for latent data
* @return The delayed data check window
*/
public TimeValue getDelayedDataCheckWindow() {
return delayedDataCheckWindow;
}

/**
* Should we check for delayed data
*/
public Boolean getShouldRunDelayedDataCheck() {
return shouldRunDelayedDataCheck;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -204,6 +233,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
if (delayedDataCheckWindow != null) {
builder.field(DELAYED_DATA_CHECK_WINDOW.getPreferredName(), delayedDataCheckWindow.getStringRep());
}
if (shouldRunDelayedDataCheck != null) {
builder.field(SHOULD_RUN_DELAYED_DATA_CHECK.getPreferredName(), shouldRunDelayedDataCheck);
}

builder.endObject();
return builder;
Expand Down Expand Up @@ -244,7 +279,9 @@ public boolean equals(Object other) {
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckWindow, that.delayedDataCheckWindow)
&& Objects.equals(this.shouldRunDelayedDataCheck, that.shouldRunDelayedDataCheck);
}

/**
Expand All @@ -255,7 +292,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckWindow, shouldRunDelayedDataCheck);
}

public static Builder builder(String id, String jobId) {
Expand All @@ -275,6 +312,8 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private TimeValue delayedDataCheckWindow;
private Boolean shouldRunDelayedDataCheck;

public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
Expand All @@ -293,6 +332,8 @@ public Builder(DatafeedConfig config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckWindow = config.getDelayedDataCheckWindow();
this.shouldRunDelayedDataCheck = config.getShouldRunDelayedDataCheck();
}

public Builder setIndices(List<String> indices) {
Expand Down Expand Up @@ -366,9 +407,38 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

/**
* This determines how far in the past we look for data being indexed too late for the datafeed to pick it up.
*
* We query the index to the latest finalized bucket from this TimeValue in the past looking to see if any data has been indexed
* since the data was read with the Datafeed.
*
* The window must be larger than the {@link org.elasticsearch.client.ml.job.config.AnalysisConfig#bucketSpan}, less than
* 24 hours, and span less than 10,000x buckets.
*
* @param delayedDataCheckWindow The time length in the past from the latest finalized bucket to look for latent data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit missing full stop at the end of the sentence

* Defaults to 2 hours.
*/
public Builder setDelayedDataCheckWindow(TimeValue delayedDataCheckWindow) {
this.delayedDataCheckWindow = delayedDataCheckWindow;
return this;
}

/**
* When running the datafeed in real-time, should there be additional checks for data being indexed after the datafeed
* reads from the index.
*
* @param shouldRunDelayedDataCheck when {@code false} no checks are made for latent data in the real-time datafeed
* Defaults to {@code true}
*/
public Builder setShouldRunDelayedDataCheck(Boolean shouldRunDelayedDataCheck) {
this.shouldRunDelayedDataCheck = shouldRunDelayedDataCheck;
return this;
}

public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckWindow, shouldRunDelayedDataCheck);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class DatafeedUpdate implements ToXContentObject {
}, DatafeedConfig.SCRIPT_FIELDS);
PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG);
PARSER.declareString((builder, val) -> builder.setDelayedDataCheckWindow(
TimeValue.parseTimeValue(val, DatafeedConfig.DELAYED_DATA_CHECK_WINDOW.getPreferredName())),
DatafeedConfig.DELAYED_DATA_CHECK_WINDOW);
PARSER.declareBoolean(Builder::setShouldRunDelayedDataCheck, DatafeedConfig.SHOULD_RUN_DELAYED_DATA_CHECK);
}

private static BytesReference parseBytes(XContentParser parser) throws IOException {
Expand All @@ -96,10 +100,13 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final TimeValue delayedDataCheckWindow;
private final Boolean shouldRunDelayedDataCheck;

private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
BytesReference query, BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, TimeValue delayedDataCheckWindow,
Boolean shouldRunDelayedDataCheck) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
Expand All @@ -111,6 +118,8 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
this.scriptFields = scriptFields;
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckWindow = delayedDataCheckWindow;
this.shouldRunDelayedDataCheck = shouldRunDelayedDataCheck;
}

/**
Expand Down Expand Up @@ -146,6 +155,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}
if (delayedDataCheckWindow != null) {
builder.field(DatafeedConfig.DELAYED_DATA_CHECK_WINDOW.getPreferredName(), delayedDataCheckWindow.getStringRep());
}
addOptionalField(builder, DatafeedConfig.SHOULD_RUN_DELAYED_DATA_CHECK, shouldRunDelayedDataCheck);
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
builder.endObject();
Expand Down Expand Up @@ -198,6 +211,14 @@ public ChunkingConfig getChunkingConfig() {
return chunkingConfig;
}

public TimeValue getDelayedDataCheckWindow() {
return delayedDataCheckWindow;
}

public Boolean getShouldRunDelayedDataCheck() {
return shouldRunDelayedDataCheck;
}

private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
Expand Down Expand Up @@ -232,6 +253,8 @@ public boolean equals(Object other) {
&& Objects.equals(asMap(this.query), asMap(that.query))
&& Objects.equals(this.scrollSize, that.scrollSize)
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.delayedDataCheckWindow, that.delayedDataCheckWindow)
&& Objects.equals(this.shouldRunDelayedDataCheck, that.shouldRunDelayedDataCheck)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
}
Expand All @@ -244,7 +267,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig);
chunkingConfig, delayedDataCheckWindow, shouldRunDelayedDataCheck);
}

public static Builder builder(String id) {
Expand All @@ -264,6 +287,8 @@ public static class Builder {
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private TimeValue delayedDataCheckWindow;
private Boolean shouldRunDelayedDataCheck;

public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
Expand All @@ -281,6 +306,8 @@ public Builder(DatafeedUpdate config) {
this.scriptFields = config.scriptFields;
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckWindow = config.delayedDataCheckWindow;
this.shouldRunDelayedDataCheck = config.shouldRunDelayedDataCheck;
}

public Builder setJobId(String jobId) {
Expand Down Expand Up @@ -359,9 +386,19 @@ public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
return this;
}

public Builder setDelayedDataCheckWindow(TimeValue delayedDataCheckWindow) {
this.delayedDataCheckWindow = delayedDataCheckWindow;
return this;
}

public Builder setShouldRunDelayedDataCheck(boolean shouldRunDelayedDataCheck) {
this.shouldRunDelayedDataCheck = shouldRunDelayedDataCheck;
return this;
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig);
chunkingConfig, delayedDataCheckWindow, shouldRunDelayedDataCheck);
}

private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,17 @@ public void testPutDatafeed() throws Exception {
datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1>
// end::put-datafeed-config-set-query-delay

// tag::put-datafeed-config-set-should-run-delayed-data-check
datafeedBuilder.setShouldRunDelayedDataCheck(true); // <1>
// end::put-datafeed-config-set-should-run-delayed-data-check

// no need to accidentally trip internal validations due to job bucket size
datafeedBuilder.setShouldRunDelayedDataCheck(false);

// tag::put-datafeed-config-set-delayed-data-check-window
datafeedBuilder.setDelayedDataCheckWindow(TimeValue.timeValueMinutes(90)); // <1>
// end::put-datafeed-config-set-delayed-data-check-window

List<SearchSourceBuilder.ScriptField> scriptFields = Collections.emptyList();
// tag::put-datafeed-config-set-script-fields
datafeedBuilder.setScriptFields(scriptFields); // <1>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public static DatafeedConfig.Builder createRandomBuilder() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
if (randomBoolean()) {
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
builder.setShouldRunDelayedDataCheck(randomBoolean());
}
if (randomBoolean()) {
builder.setDelayedDataCheckWindow(new TimeValue(randomLongBetween(bucketSpanMillis,bucketSpanMillis*10)));
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public static DatafeedUpdate createRandom() {
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
boolean shouldRunDelayedDataCheck = randomBoolean();
builder.setShouldRunDelayedDataCheck(shouldRunDelayedDataCheck);
if (shouldRunDelayedDataCheck || randomBoolean()) {
builder.setDelayedDataCheckWindow(new TimeValue(randomLongBetween(300_001, 600_000)));
}
return builder.build();
}

Expand Down
15 changes: 15 additions & 0 deletions docs/java-rest/high-level/ml/put-datafeed.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ include-tagged::{doc-tests-file}[{api}-config-set-query-delay]
--------------------------------------------------
<1> The time interval behind real time that data is queried.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-config-set-should-run-delayed-data-check]
--------------------------------------------------
<1> Should we look for data that is ingested too late when running the datafeed
in real time. Defaults to `true`

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-config-set-delayed-data-check-window]
--------------------------------------------------
<1> How far in the past from the latest finalized bucket to check for delayed data being indexed.
The window must be larger than the Job's bucket size, but smaller than 24 hours, and span less than
10,000 buckets. Defaults to 2 hours.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-config-set-script-fields]
Expand Down
14 changes: 14 additions & 0 deletions docs/reference/ml/apis/datafeedresource.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ A {dfeed} resource has the following properties:
example: `[]`. This property is provided for backwards compatibility with
releases earlier than 6.0.0. For more information, see <<removal-of-types>>.

`delayed_data_check_window`::
(time units) The time window to query that ends at the latest finalized bucket
when determining if any data was indexed too late for the {dfeed} to see it. For
example, supply `3h` to check for latent data in the {dfeed} `indices` over the
last `3h` since the latest finalized bucket. Only valid in real-time analysis
and when `should_run_delayed_data_check` is `true`. Defaults to 2 hours.
This value must be larger than the bucket_span, less than 24 hours, and span
fewer than 10,000 buckets.

`should_run_delayed_data_check`::
(boolean) When running a real-time {dfeed}, should the `indices` be checked
for latent data. `delayed_data_check_window` must be set when this is `true`.
Defaults to `true`

[[ml-datafeed-chunking-config]]
==== Chunking Configuration Objects

Expand Down
14 changes: 14 additions & 0 deletions docs/reference/ml/apis/put-datafeed.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ You must create a job before you create a {dfeed}. You can associate only one
For example: `[]`. This property is provided for backwards compatibility with
releases earlier than 6.0.0. For more information, see <<removal-of-types>>.

`delayed_data_check_window`::
(time units) The time window to query that ends at the latest finalized bucket
when determining if any data was indexed too late for the {dfeed} to see it. For
example, supply `3h` to check for latent data in the {dfeed} `indices` over the
last `3h` since the latest finalized bucket. Only valid in real-time analysis
and when `should_run_delayed_data_check` is `true`. Defaults to 2 hours.
This value must be larger than the bucket_span, less than 24 hours, and span
fewer than 10,000 buckets.

`should_run_delayed_data_check`::
(boolean) When running a real-time {dfeed}, should the `indices` be checked
for latent data. `delayed_data_check_window` must be set when this is `true`.
Defaults true `true`

For more information about these properties,
see <<ml-datafeed-resource>>.

Expand Down