Skip to content

Commit

Permalink
[ML] Add latest search interval to datafeed stats (#82620)
Browse files Browse the repository at this point in the history
This commit adds `search_interval` to the datafeed stats API
`running_state` object. When the datafeed is running, it reports
the last search interval that was searched. It is useful to
understand the point in time where the datafeed is currently
searching.

Closes #82405
  • Loading branch information
dimitris-athanasiou committed Jan 16, 2022
1 parent a5e7984 commit 93777b4
Show file tree
Hide file tree
Showing 21 changed files with 304 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ has no configured `end` time.
(boolean) Indicates whether the {dfeed} has finished running on the available
past data. For {dfeeds} without a configured `end` time, this means that
the {dfeed} is now running on "real-time" data.
`search_interval`:::
(Optional, object) Provides the latest time interval the {dfeed} has searched.
+
[%collapsible%open]
=====
`start_ms`::::
The start time as an epoch in milliseconds.

`end_ms`::::
The end time as an epoch in milliseconds.
=====
====
--

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -76,40 +79,58 @@ public static class RunningState implements Writeable, ToXContentObject {
// Has the look back finished and are we now running on "real-time" data
private final boolean realTimeRunning;

public RunningState(boolean realTimeConfigured, boolean realTimeRunning) {
// The current time interval that datafeed is searching
@Nullable
private final SearchInterval searchInterval;

public RunningState(boolean realTimeConfigured, boolean realTimeRunning, @Nullable SearchInterval searchInterval) {
this.realTimeConfigured = realTimeConfigured;
this.realTimeRunning = realTimeRunning;
this.searchInterval = searchInterval;
}

public RunningState(StreamInput in) throws IOException {
this.realTimeConfigured = in.readBoolean();
this.realTimeRunning = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_1_0)) {
this.searchInterval = in.readOptionalWriteable(SearchInterval::new);
} else {
this.searchInterval = null;
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RunningState that = (RunningState) o;
return realTimeConfigured == that.realTimeConfigured && realTimeRunning == that.realTimeRunning;
return realTimeConfigured == that.realTimeConfigured
&& realTimeRunning == that.realTimeRunning
&& Objects.equals(searchInterval, that.searchInterval);
}

@Override
public int hashCode() {
return Objects.hash(realTimeConfigured, realTimeRunning);
return Objects.hash(realTimeConfigured, realTimeRunning, searchInterval);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(realTimeConfigured);
out.writeBoolean(realTimeRunning);
if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeOptionalWriteable(searchInterval);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("real_time_configured", realTimeConfigured);
builder.field("real_time_running", realTimeRunning);
if (searchInterval != null) {
builder.field("search_interval", searchInterval);
}
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.datafeed;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

public record SearchInterval(long startMs, long endMs) implements ToXContentObject, Writeable {

public static final ParseField START = new ParseField("start");
public static final ParseField START_MS = new ParseField("start_ms");
public static final ParseField END = new ParseField("end");
public static final ParseField END_MS = new ParseField("end_ms");

public SearchInterval(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.timeField(START_MS.getPreferredName(), START.getPreferredName(), startMs);
builder.timeField(END_MS.getPreferredName(), END.getPreferredName(), endMs);
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startMs);
out.writeVLong(endMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
*/
package org.elasticsearch.xpack.core.ml.datafeed.extractor;

import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;

import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;

public interface DataExtractor {

record Result(SearchInterval searchInterval, Optional<InputStream> data) {}

/**
* @return {@code true} if the search has not finished yet, or {@code false} otherwise
*/
Expand All @@ -20,10 +24,10 @@ public interface DataExtractor {
/**
* Returns the next available extracted data. Note that it is possible for the
* extracted data to be empty the last time this method can be called.
* @return an optional input stream with the next available extracted data
* @return a result with the search interval and an optional input stream with the next available extracted data
* @throws IOException if an error occurs while extracting the data
*/
Optional<InputStream> next() throws IOException;
Result next() throws IOException;

/**
* @return {@code true} if the extractor has been cancelled, or {@code false} otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction.Response;
import org.elasticsearch.xpack.core.ml.datafeed.SearchIntervalTests;

import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -17,7 +18,7 @@
public class GetDatafeedRunningStateActionResponseTests extends AbstractWireSerializingTestCase<Response> {

static Response.RunningState randomRunningState() {
return new Response.RunningState(randomBoolean(), randomBoolean());
return new Response.RunningState(randomBoolean(), randomBoolean(), randomBoolean() ? null : SearchIntervalTests.createRandom());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.datafeed;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

public class SearchIntervalTests extends AbstractWireSerializingTestCase<SearchInterval> {

@Override
protected Writeable.Reader<SearchInterval> instanceReader() {
return SearchInterval::new;
}

@Override
protected SearchInterval createTestInstance() {
return createRandom();
}

public static SearchInterval createRandom() {
long start = randomNonNegativeLong();
return new SearchInterval(start, randomLongBetween(start, Long.MAX_VALUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
Collectors.toMap(
StartDatafeedAction.DatafeedParams::getDatafeedId,
// If it isn't assigned to a node, assume that look back hasn't completed yet
params -> new Response.RunningState(params.getEndTime() == null, false)
params -> new Response.RunningState(params.getEndTime() == null, false, null)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private void isDateNanos(DatafeedConfig datafeed, String timeField, ActionListen
/** Visible for testing */
static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewDatafeedAction.Response> listener) {
try {
Optional<InputStream> inputStream = dataExtractor.next();
Optional<InputStream> inputStream = dataExtractor.next().data();
// DataExtractor returns single-line JSON but without newline characters between objects.
// Instead, it has a space between objects due to how JSON XContentBuilder works.
// In order to return a proper JSON array from preview, we surround with square brackets and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,14 @@ public GetDatafeedRunningStateAction.Response.RunningState getRunningState() {
// reasonable to say real-time running hasn't started yet. The state will quickly
// change once the datafeed runner gets going and determines where the datafeed is up
// to.
return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, false);
return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, false, null);
}
}
return new GetDatafeedRunningStateAction.Response.RunningState(endTime == null, datafeedRunner.finishedLookBack(this));
return new GetDatafeedRunningStateAction.Response.RunningState(
endTime == null,
datafeedRunner.finishedLookBack(this),
datafeedRunner.getSearchInterval(this)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
Expand Down Expand Up @@ -77,6 +79,7 @@ class DatafeedJob {
private volatile boolean isIsolated;
private volatile boolean haveEverSeenData;
private volatile long consecutiveDelayedDataBuckets;
private volatile SearchInterval searchInterval;

DatafeedJob(
String jobId,
Expand Down Expand Up @@ -138,6 +141,11 @@ public void finishReportingTimingStats() {
timingStatsReporter.finishReporting();
}

@Nullable
public SearchInterval getSearchInterval() {
return searchInterval;
}

Long runLookBack(long startTime, Long endTime) throws Exception {
lookbackStartTimeMs = skipToStartTime(startTime);
Optional<Long> endMs = Optional.ofNullable(endTime);
Expand Down Expand Up @@ -358,7 +366,9 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro

Optional<InputStream> extractedData;
try {
extractedData = dataExtractor.next();
DataExtractor.Result result = dataExtractor.next();
extractedData = result.data();
searchInterval = result.searchInterval();
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e);
// When extraction problems are encountered, we do not want to advance time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
Expand Down Expand Up @@ -212,6 +213,11 @@ public boolean finishedLookBack(TransportStartDatafeedAction.DatafeedTask task)
return holder != null && holder.isLookbackFinished();
}

public SearchInterval getSearchInterval(TransportStartDatafeedAction.DatafeedTask task) {
Holder holder = runningDatafeedsOnThisNode.get(task.getAllocationId());
return holder == null ? null : holder.datafeedJob.getSearchInterval();
}

// Important: Holder must be created and assigned to DatafeedTask before setting state to started,
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.SearchInterval;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -84,16 +84,17 @@ public long getEndTime() {
}

@Override
public Optional<InputStream> next() throws IOException {
public Result next() throws IOException {
if (hasNext() == false) {
throw new NoSuchElementException();
}

SearchInterval searchInterval = new SearchInterval(context.start, context.end);
if (aggregationToJsonProcessor == null) {
Aggregations aggs = search();
if (aggs == null) {
hasNext = false;
return Optional.empty();
return new Result(searchInterval, Optional.empty());
}
initAggregationProcessor(aggs);
}
Expand All @@ -104,9 +105,12 @@ public Optional<InputStream> next() throws IOException {
// We process the whole search. So, if we are chunking or not, we have nothing more to process given the current query
hasNext = false;

return aggregationToJsonProcessor.getKeyValueCount() > 0
? Optional.of(new ByteArrayInputStream(outputStream.toByteArray()))
: Optional.empty();
return new Result(
searchInterval,
aggregationToJsonProcessor.getKeyValueCount() > 0
? Optional.of(new ByteArrayInputStream(outputStream.toByteArray()))
: Optional.empty()
);
}

private Aggregations search() {
Expand Down

0 comments on commit 93777b4

Please sign in to comment.