Skip to content

Commit

Permalink
Transforms: Adding basic stats API param (#104878)
Browse files Browse the repository at this point in the history
* Transforms: Adding basic stats API param

Adds an optional parameter to the `transform/_stats` API called
`basic`.  This parameter defaults to false, returning the
complete set of stats (this is the same functionality as today).

This helps reduce the latency calling for stats of one or more
transforms, as the `operationsBehind` calculation is increasingly
expensive as the size of the transform and number of nodes grow. Users
can get a basic view of the current state, health, and progress of one
or more transforms, and a second call for the complete stats set can be
made when users want to drill down into a given transform.

When `transform/_stats?basic=true`, Transforms will only return a
subset of stats obtained by information immediately available from the
main node, including `id`, `state`, `node`, `stats`, and `health`.

`checkpointing` may be omitted.

For continuous transforms, `checkpointing` will include the `last`
`checkpoint` id.  If there is a difference in data but the transform
has not started on that difference yet, the `next` checkpoint will
be included with the `position` and `progress`.

For stopped transforms, `checkpointing` will include the `last`
`checkpoint` id and the `next` `position` and `progress`.

In both cases, `operationsBehind` will never be calculated, and all
timestamp information will not be recorded.
  • Loading branch information
prwhelan committed Feb 1, 2024
1 parent b94b11a commit 341d29a
Show file tree
Hide file tree
Showing 21 changed files with 459 additions and 142 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104878.yaml
@@ -0,0 +1,5 @@
pr: 104878
summary: "Transforms: Adding basic stats API param"
area: Transform
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Expand Up @@ -168,6 +168,8 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_REQUEST_INPUT_TYPE_UNSPECIFIED_ADDED = def(8_581_00_0);
public static final TransportVersion ASYNC_SEARCH_STATUS_SUPPORTS_KEEP_ALIVE = def(8_582_00_0);
public static final TransportVersion KNN_QUERY_NUMCANDS_AS_OPTIONAL_PARAM = def(8_583_00_0);
public static final TransportVersion TRANSFORM_GET_BASIC_STATS = def(8_584_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -26,6 +26,7 @@ public final class TransformField {
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField WAIT_FOR_CHECKPOINT = new ParseField("wait_for_checkpoint");
public static final ParseField STATS_FIELD = new ParseField("stats");
public static final ParseField BASIC_STATS = new ParseField("basic");
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
public static final ParseField SOURCE = new ParseField("source");
public static final ParseField DESCRIPTION = new ParseField("description");
Expand Down
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.TaskOperationFailure;
Expand Down Expand Up @@ -53,19 +54,21 @@ public static final class Request extends BaseTasksRequest<Request> {
private final String id;
private PageParams pageParams = PageParams.defaultParams();
private boolean allowNoMatch = true;
private final boolean basic;

public static final int MAX_SIZE_RETURN = 1000;
// used internally to expand the queried id expression
private List<String> expandedIds;

public Request(String id, @Nullable TimeValue timeout) {
public Request(String id, @Nullable TimeValue timeout, boolean basic) {
setTimeout(timeout);
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
this.id = Metadata.ALL;
} else {
this.id = id;
}
this.expandedIds = Collections.singletonList(this.id);
this.basic = basic;
}

public Request(StreamInput in) throws IOException {
Expand All @@ -74,6 +77,11 @@ public Request(StreamInput in) throws IOException {
expandedIds = in.readCollectionAsImmutableList(StreamInput::readString);
pageParams = new PageParams(in);
allowNoMatch = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_BASIC_STATS)) {
basic = in.readBoolean();
} else {
basic = false;
}
}

@Override
Expand Down Expand Up @@ -111,13 +119,20 @@ public void setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
}

public boolean isBasic() {
return basic;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeStringCollection(expandedIds);
pageParams.writeTo(out);
out.writeBoolean(allowNoMatch);
if (out.getTransportVersion().onOrAfter(TransportVersions.TRANSFORM_GET_BASIC_STATS)) {
out.writeBoolean(basic);
}
}

@Override
Expand All @@ -134,7 +149,7 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id, pageParams, allowNoMatch);
return Objects.hash(id, pageParams, allowNoMatch, basic);
}

@Override
Expand All @@ -146,7 +161,10 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id) && Objects.equals(pageParams, other.pageParams) && allowNoMatch == other.allowNoMatch;
return Objects.equals(id, other.id)
&& Objects.equals(pageParams, other.pageParams)
&& allowNoMatch == other.allowNoMatch
&& basic == other.basic;
}

@Override
Expand Down
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -261,24 +260,5 @@ public String toString() {
public String value() {
return name().toLowerCase(Locale.ROOT);
}

// only used when speaking to nodes < 7.4 (can be removed for 8.0)
public Tuple<TransformTaskState, IndexerState> toComponents() {

return switch (this) {
case STARTED -> new Tuple<>(TransformTaskState.STARTED, IndexerState.STARTED);
case INDEXING -> new Tuple<>(TransformTaskState.STARTED, IndexerState.INDEXING);
case ABORTING -> new Tuple<>(TransformTaskState.STARTED, IndexerState.ABORTING);
case STOPPING ->
// This one is not deterministic, because an overall state of STOPPING could arise
// from either (STARTED, STOPPED) or (STARTED, STOPPING). However, (STARTED, STOPPED)
// is a very short-lived state so it's reasonable to assume the other, especially
// as this method is only for mixed version cluster compatibility.
new Tuple<>(TransformTaskState.STARTED, IndexerState.STOPPING);
case STOPPED -> new Tuple<>(TransformTaskState.STOPPED, null);
case FAILED -> new Tuple<>(TransformTaskState.FAILED, null);
default -> throw new IllegalStateException("Unexpected state enum value: " + this);
};
}
}
}
Expand Up @@ -27,7 +27,8 @@ public class GetTransformStatsActionRequestTests extends AbstractWireSerializing
protected Request createTestInstance() {
return new Request(
randomBoolean() ? randomAlphaOfLengthBetween(1, 20) : randomBoolean() ? Metadata.ALL : null,
randomBoolean() ? TimeValue.parseTimeValue(randomTimeValue(), "timeout") : null
randomBoolean() ? TimeValue.parseTimeValue(randomTimeValue(), "timeout") : null,
randomBoolean()
);
}

Expand All @@ -42,7 +43,7 @@ protected Writeable.Reader<Request> instanceReader() {
}

public void testCreateTask() {
Request request = new Request("some-transform", null);
Request request = new Request("some-transform", null, false);
Task task = request.createTask(123, "type", "action", TaskId.EMPTY_TASK_ID, Map.of());
assertThat(task, is(instanceOf(CancellableTask.class)));
assertThat(task.getDescription(), is(equalTo("get_transform_stats[some-transform]")));
Expand Down
Expand Up @@ -189,7 +189,7 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
assertBusy(() -> {
// Verify that each transform processed an expected number of documents.
for (String transformId : transformIds) {
Map<?, ?> stats = getTransformStats(transformId);
Map<?, ?> stats = getBasicTransformStats(transformId);
assertThat(
"Stats were: " + stats,
XContentMapValues.extractValue(stats, "stats", "documents_processed"),
Expand Down

0 comments on commit 341d29a

Please sign in to comment.