Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.QueryMetric;

import java.util.function.BiConsumer;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
Expand All @@ -48,7 +51,8 @@ public void esql(
String sessionId,
EsqlConfiguration cfg,
EnrichPolicyResolver enrichPolicyResolver,
ActionListener<PhysicalPlan> listener
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
final var session = new EsqlSession(
sessionId,
Expand All @@ -63,7 +67,7 @@ public void esql(
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, wrap(listener::onResponse, ex -> {
session.execute(request, runPhase, wrap(listener::onResponse, ex -> {
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
metrics.failed(clientId);
listener.onFailure(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.Result;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -89,8 +90,6 @@
* Computes the result of a {@link PhysicalPlan}.
*/
public class ComputeService {
public record Result(List<Page> pages, List<DriverProfile> profiles) {}

private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);
private final SearchService searchService;
private final BigArrays bigArrays;
Expand Down Expand Up @@ -176,7 +175,7 @@ public void execute(
rootTask,
computeContext,
coordinatorPlan,
listener.map(driverProfiles -> new Result(collectedPages, driverProfiles))
listener.map(driverProfiles -> new Result(physicalPlan.output(), collectedPages, driverProfiles))
);
return;
} else {
Expand All @@ -201,7 +200,9 @@ public void execute(
);
try (
Releasable ignored = exchangeSource.addEmptySink();
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new Result(collectedPages, collectedProfiles)))
RefCountingListener refs = new RefCountingListener(
listener.map(unused -> new Result(physicalPlan.output(), collectedPages, collectedProfiles))
)
) {
// run compute on the coordinator
exchangeSource.addCompletionListener(refs.acquire());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.Result;

import java.io.IOException;
import java.time.ZoneOffset;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

Expand Down Expand Up @@ -157,37 +160,37 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
request.tables()
);
String sessionId = sessionID(task);
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (physicalPlan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
physicalPlan,
configuration,
resultListener
);

planExecutor.esql(
request,
sessionId,
configuration,
enrichPolicyResolver,
listener.delegateFailureAndWrap(
(delegate, physicalPlan) -> computeService.execute(
sessionId,
(CancellableTask) task,
physicalPlan,
configuration,
delegate.map(result -> {
List<ColumnInfoImpl> columns = physicalPlan.output()
.stream()
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile()
? new EsqlQueryResponse.Profile(result.profiles())
: null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
} else {
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}
})
)
)
runPhase,
listener.map(result -> toResponse(task, request, configuration, result))
);
}

private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, EsqlConfiguration configuration, Result result) {
List<ColumnInfoImpl> columns = result.schema()
.stream()
.map(c -> new ColumnInfoImpl(c.qualifiedName(), c.dataType().outputType()))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
}
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}

/**
* Returns the ID for this compute session. The ID is unique within the cluster, and is used
* to identify the compute-session across nodes. The ID is just the TaskID of the task that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,10 +111,19 @@ public String sessionId() {
return sessionId;
}

public void execute(EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
public void execute(
EsqlQueryRequest request,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
LOGGER.debug("ESQL query:\n{}", request.query());
LogicalPlan logicalPlan = parse(request.query(), request.params());
logicalPlanToPhysicalPlan(logicalPlan, request, listener.delegateFailureAndWrap((l, r) -> runPhase.accept(r, l)));
}

private void logicalPlanToPhysicalPlan(LogicalPlan logicalPlan, EsqlQueryRequest request, ActionListener<PhysicalPlan> listener) {
optimizedPhysicalPlan(
parse(request.query(), request.params()),
logicalPlan,
listener.map(plan -> EstimatesRowSize.estimateRowSize(0, plan.transformUp(FragmentExec.class, f -> {
QueryBuilder filter = request.filter();
if (filter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,23 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;

import java.util.List;

public record Result(List<Attribute> columns, List<List<Object>> values) {}
/**
* Results from running a chunk of ESQL.
* @param schema "Schema" of the {@link Attribute}s that are produced by the {@link LogicalPlan}
* that was run. Each {@link Page} contains a {@link Block} of values for each
* attribute in this list.
* @param pages Actual values produced by running the ESQL.
* @param profiles {@link DriverProfile}s from all drivers that ran to produce the output. These
* are quite cheap to build, so we build them for all ESQL runs, regardless of if
* users have asked for them. But we only include them in the results if users ask
* for them.
*/
public record Result(List<Attribute> schema, List<Page> pages, List<DriverProfile> profiles) {}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.esql.execution.PlanExecutor;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
import org.junit.After;
import org.junit.Before;
Expand All @@ -33,6 +34,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -100,9 +102,10 @@ public void testFailedMetric() {
var request = new EsqlQueryRequest();
// test a failed query: xyz field doesn't exist
request.query("from test | stats m = max(xyz)");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (p, r) -> fail("this shouldn't happen");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
@Override
public void onResponse(PhysicalPlan physicalPlan) {
public void onResponse(Result result) {
fail("this shouldn't happen");
}

Expand All @@ -119,9 +122,10 @@ public void onFailure(Exception e) {

// fix the failing query: foo field does exist
request.query("from test | stats m = max(foo)");
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, new ActionListener<>() {
runPhase = (p, r) -> r.onResponse(null);
planExecutor.esql(request, randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, runPhase, new ActionListener<>() {
@Override
public void onResponse(PhysicalPlan physicalPlan) {}
public void onResponse(Result result) {}

@Override
public void onFailure(Exception e) {
Expand Down