Skip to content

Commit

Permalink
[ML] adding running_state to datafeed stats object (#73926) (#74002)
Browse files Browse the repository at this point in the history
It is useful to know the following information when reading datafeed stats:

 - Is the datafeed a "real-time" datafeed, i.e. a datafeed without a configured `end` time
 - Has the datafeed processed all past data available at the time of starting.

This object is only available if the datafeed task has been created.

It has the form:

```
"running_state": {
  "is_real_time": <boolean>,
  "look_back_finished": <boolean>
}
```
  • Loading branch information
benwtrent committed Jun 10, 2021
1 parent 76b6cc6 commit 43cd27d
Show file tree
Hide file tree
Showing 13 changed files with 675 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Retrieves usage information for {dfeeds}.
[[ml-get-datafeed-stats-prereqs]]
== {api-prereq-title}

Requires the `monitor_ml` cluster privilege. This privilege is included in the
Requires the `monitor_ml` cluster privilege. This privilege is included in the
`machine_learning_user` built-in role.

[[ml-get-datafeed-stats-desc]]
Expand Down Expand Up @@ -103,6 +103,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address]
====
--

`running_state`::
(object) An object containing the running state for this {dfeed}. It is only
provided if the {dfeed} is started.
+
--
[%collapsible%open]
====
`real_time_configured`:::
(boolean) Indicates if the {dfeed} is "real-time"; meaning that the {dfeed}
has no configured `end` time.
`real_time_running`:::
(boolean) Indicates whether the {dfeed} has finished running on the available
past data. For {dfeeds} without a configured `end` time, this means that
the {dfeed} is now running on "real-time" data.
====
--

`state`::
(string)
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=state-datafeed]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ml.MlTasks;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;


/**
* Internal only action to get the current running state of a datafeed
*/
public class GetDatafeedRunningStateAction extends ActionType<GetDatafeedRunningStateAction.Response> {

public static final GetDatafeedRunningStateAction INSTANCE = new GetDatafeedRunningStateAction();
public static final String NAME = "cluster:internal/xpack/ml/datafeed/running_state";

private GetDatafeedRunningStateAction() {
super(NAME, GetDatafeedRunningStateAction.Response::new);
}

public static class Request extends BaseTasksRequest<Request> {

private final Set<String> datafeedTaskIds;

public Request(List<String> datafeedIds) {
this.datafeedTaskIds = datafeedIds.stream().map(MlTasks::datafeedTaskId).collect(Collectors.toSet());
}

public Request(StreamInput in) throws IOException {
super(in);
this.datafeedTaskIds = in.readSet(StreamInput::readString);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(datafeedTaskIds);
}

public Set<String> getDatafeedTaskIds() {
return datafeedTaskIds;
}

@Override
public boolean match(Task task) {
return task instanceof StartDatafeedAction.DatafeedTaskMatcher && datafeedTaskIds.contains(task.getDescription());
}
}

public static class Response extends BaseTasksResponse {

public static class RunningState implements Writeable, ToXContentObject {

// Is the datafeed a "realtime" datafeed, meaning it was started without an end_time
private final boolean realTimeConfigured;
// Has the look back finished and are we now running on "real-time" data
private final boolean realTimeRunning;

public RunningState(boolean realTimeConfigured, boolean realTimeRunning) {
this.realTimeConfigured = realTimeConfigured;
this.realTimeRunning = realTimeRunning;
}

public RunningState(StreamInput in) throws IOException {
this.realTimeConfigured = in.readBoolean();
this.realTimeRunning = in.readBoolean();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RunningState that = (RunningState) o;
return realTimeConfigured == that.realTimeConfigured && realTimeRunning == that.realTimeRunning;
}

@Override
public int hashCode() {
return Objects.hash(realTimeConfigured, realTimeRunning);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(realTimeConfigured);
out.writeBoolean(realTimeRunning);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("real_time_configured", realTimeConfigured);
builder.field("real_time_running", realTimeRunning);
builder.endObject();
return builder;
}
}

private final Map<String, RunningState> datafeedRunningState;

public static Response fromResponses(List<Response> responses) {
return new Response(responses.stream()
.flatMap(r -> r.datafeedRunningState.entrySet().stream())
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

public static Response fromTaskAndState(String datafeedId, RunningState runningState) {
return new Response(MapBuilder.<String, RunningState>newMapBuilder().put(datafeedId, runningState).map());
}

public Response(StreamInput in) throws IOException {
super(in);
datafeedRunningState = in.readMap(StreamInput::readString, RunningState::new);
}

public Response(Map<String, RunningState> runtimeStateMap) {
super(null, null);
this.datafeedRunningState = runtimeStateMap;
}

public Optional<RunningState> getRunningState(String datafeedId) {
return Optional.ofNullable(datafeedRunningState.get(datafeedId));
}

public Map<String, RunningState> getDatafeedRunningState() {
return datafeedRunningState;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(datafeedRunningState, StreamOutput::writeString, (o, w) -> w.writeTo(o));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return Objects.equals(this.datafeedRunningState, response.datafeedRunningState);
}

@Override
public int hashCode() {
return Objects.hash(datafeedRunningState);
}
}

}

0 comments on commit 43cd27d

Please sign in to comment.