Skip to content

Commit

Permalink
Add status for enrich operator (#106036)
Browse files Browse the repository at this point in the history
This PR adds a status for the enrich operator. This status 
should help us answer how fast the enrich operator is.
  • Loading branch information
dnhatn committed Mar 6, 2024
1 parent 5377b4a commit e971c51
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106036.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106036
summary: Add status for enrich operator
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0);
public static final TransportVersion DATA_STREAM_AUTO_SHARDING_EVENT = def(8_598_00_0);
public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0);
public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@
package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;

/**
* {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}.
Expand All @@ -33,6 +44,7 @@ public abstract class AsyncOperator implements Operator {
private final DriverContext driverContext;

private final int maxOutstandingRequests;
private final LongAdder totalTimeInNanos = new LongAdder();
private boolean finished = false;
private volatile boolean closed = false;

Expand Down Expand Up @@ -81,7 +93,11 @@ public void addInput(Page input) {
onFailure(e);
onSeqNoCompleted(seqNo);
});
performAsync(input, ActionListener.runAfter(listener, driverContext::removeAsyncAction));
final long startNanos = System.nanoTime();
performAsync(input, ActionListener.runAfter(listener, () -> {
driverContext.removeAsyncAction();
totalTimeInNanos.add(System.nanoTime() - startNanos);
}));
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -224,4 +240,107 @@ public SubscribableListener<Void> isBlocked() {
return blockedFuture;
}
}

@Override
public final Operator.Status status() {
return status(
Math.max(0L, checkpoint.getMaxSeqNo()),
Math.max(0L, checkpoint.getProcessedCheckpoint()),
TimeValue.timeValueNanos(totalTimeInNanos.sum()).millis()
);
}

protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) {
return new Status(receivedPages, completedPages, totalTimeInMillis);
}

public static class Status implements Operator.Status {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"async_operator",
Status::new
);

final long receivedPages;
final long completedPages;
final long totalTimeInMillis;

protected Status(long receivedPages, long completedPages, long totalTimeInMillis) {
this.receivedPages = receivedPages;
this.completedPages = completedPages;
this.totalTimeInMillis = totalTimeInMillis;
}

protected Status(StreamInput in) throws IOException {
this.receivedPages = in.readVLong();
this.completedPages = in.readVLong();
this.totalTimeInMillis = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(receivedPages);
out.writeVLong(completedPages);
out.writeVLong(totalTimeInMillis);
}

public long receivedPages() {
return receivedPages;
}

public long completedPages() {
return completedPages;
}

public long totalTimeInMillis() {
return totalTimeInMillis;
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder);
return builder.endObject();
}

protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
builder.field("received_pages", receivedPages);
builder.field("completed_pages", completedPages);
builder.field("total_time_in_millis", totalTimeInMillis);
if (totalTimeInMillis >= 0) {
builder.field("total_time", TimeValue.timeValueMillis(totalTimeInMillis));
}
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return receivedPages == status.receivedPages
&& completedPages == status.completedPages
&& totalTimeInMillis == status.totalTimeInMillis;
}

@Override
public int hashCode() {
return Objects.hash(receivedPages, completedPages, totalTimeInMillis);
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ESQL_ENRICH_OPERATOR_STATUS;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public long iterations() {
return iterations;
}

List<DriverStatus.OperatorStatus> operators() {
public List<DriverStatus.OperatorStatus> operators() {
return operators;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.equalTo;

public class AsyncOperatorStatusTests extends AbstractWireSerializingTestCase<AsyncOperator.Status> {
@Override
protected Writeable.Reader<AsyncOperator.Status> instanceReader() {
return AsyncOperator.Status::new;
}

@Override
protected AsyncOperator.Status createTestInstance() {
return new AsyncOperator.Status(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomLongBetween(1, TimeValue.timeValueHours(1).millis())
);
}

@Override
protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IOException {
int field = randomIntBetween(0, 2);
return switch (field) {
case 0 -> new AsyncOperator.Status(
randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong),
in.completedPages(),
in.totalTimeInMillis()
);
case 1 -> new AsyncOperator.Status(
in.receivedPages(),
randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong),
in.totalTimeInMillis()
);
case 2 -> new AsyncOperator.Status(
in.receivedPages(),
in.completedPages(),
randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong)
);
default -> throw new AssertionError("unknown ");
};
}

public void testToXContent() {
var status = new AsyncOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis());
String json = Strings.toString(status, true, true);
assertThat(json, equalTo("""
{
"received_pages" : 100,
"completed_pages" : 50,
"total_time_in_millis" : 10000,
"total_time" : "10s"
}"""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ protected EsqlQueryResponse runQuery(String query) {
EsqlQueryRequest request = new EsqlQueryRequest();
request.query(query);
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
if (randomBoolean()) {
request.profile(true);
}
return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverStatus;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
Expand Down Expand Up @@ -58,12 +60,15 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;

public class EnrichIT extends AbstractEsqlIntegTestCase {

Expand Down Expand Up @@ -121,6 +126,9 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
} else {
client = client();
}
if (request.profile() == false && randomBoolean()) {
request.profile(true);
}
if (randomBoolean()) {
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 4096)));
try {
Expand Down Expand Up @@ -318,6 +326,27 @@ public void testTopN() {
}
}

public void testProfile() {
EsqlQueryRequest request = new EsqlQueryRequest();
request.pragmas(randomPragmas());
request.query("from listens* | sort timestamp DESC | limit 1 | " + enrichSongCommand() + " | KEEP timestamp, artist");
request.profile(true);
try (var resp = run(request)) {
Iterator<Object> row = resp.values().next();
assertThat(row.next(), equalTo(7L));
assertThat(row.next(), equalTo("Linkin Park"));
EsqlQueryResponse.Profile profile = resp.profile();
assertNotNull(profile);
List<DriverProfile> drivers = profile.drivers();
assertThat(drivers.size(), greaterThanOrEqualTo(2));
List<DriverStatus.OperatorStatus> enrichOperators = drivers.stream()
.flatMap(d -> d.operators().stream())
.filter(status -> status.operator().startsWith("EnrichOperator"))
.toList();
assertThat(enrichOperators, not(emptyList()));
}
}

/**
* Some enrich queries that could fail without the PushDownEnrich rule.
*/
Expand Down

0 comments on commit e971c51

Please sign in to comment.