From 31104b8a1a9ee83b082ce960d906b77f4dd6fa3b Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 3 Jun 2024 13:57:48 +0200 Subject: [PATCH 1/4] Add tracing to gradle run --- build-tools-internal/src/main/groovy/elasticsearch.run.gradle | 3 +++ 1 file changed, 3 insertions(+) diff --git a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle index 3a905c001d0cf..261a9f2b229aa 100644 --- a/build-tools-internal/src/main/groovy/elasticsearch.run.gradle +++ b/build-tools-internal/src/main/groovy/elasticsearch.run.gradle @@ -29,6 +29,9 @@ testClusters.register("runTask") { systemProperty 'ingest.geoip.downloader.enabled.default', 'true' setting 'xpack.security.enabled', 'true' keystore 'bootstrap.password', 'password' + setting 'telemetry.tracing.enabled', 'true' + setting 'telemetry.agent.server_url', 'INSERT_SERVER_URL_HERE' + keystore 'telemetry.api_key', 'INSERT_API_KEY_HERE' user username: 'elastic-admin', password: 'elastic-password', role: '_es_test_root' numberOfNodes = 1 } From 8e8fda3bbcbbe6bbc1f2de156c3c9a1f79a681ec Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Tue, 4 Jun 2024 11:33:14 +0200 Subject: [PATCH 2/4] Add tracing to ccq IT --- .../java/org/elasticsearch/xpack/esql/ccq/Clusters.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index f20d758132cbb..3a4595a658447 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -21,6 +21,9 @@ public static ElasticsearchCluster remoteCluster() { .setting("node.roles", "[data,ingest,master]") .setting("xpack.security.enabled", "false") .setting("xpack.license.self_generated.type", "trial") + .setting("telemetry.tracing.enabled", "true") + .setting("telemetry.agent.server_url", "") + .keystore("telemetry.api_key", "") .shared(true) .setting("cluster.routing.rebalance.enable", "none") .build(); @@ -34,6 +37,9 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust .nodes(2) .setting("xpack.security.enabled", "false") .setting("xpack.license.self_generated.type", "trial") + .setting("telemetry.tracing.enabled", "true") + .setting("telemetry.agent.server_url", "") + .keystore("telemetry.api_key", "") .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"") .setting("cluster.remote.connections_per_cluster", "1") From 730e6933153f4f6770ef1b6124d7174175d255e3 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 10 Jun 2024 10:34:50 +0200 Subject: [PATCH 3/4] Add Ievgen's TracerSpan --- .../apm/internal/tracing/APMTracer.java | 5 ++ .../telemetry/tracing/Tracer.java | 10 +++ .../telemetry/tracing/TracerSpan.java | 75 +++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java index 0b020a24eeffb..4142d39952807 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java @@ -118,6 +118,11 @@ public void setLabelFilters(List labelFilters) { this.labelFilterAutomaton = buildAutomaton(labelFilters, List.of()); } + @Override + public boolean isEnabled() { + return enabled; + } + // package-private for testing CharacterRunAutomaton getLabelFilterAutomaton() { return labelFilterAutomaton; diff --git a/server/src/main/java/org/elasticsearch/telemetry/tracing/Tracer.java b/server/src/main/java/org/elasticsearch/telemetry/tracing/Tracer.java index 6f2c98dda4e2b..21bac627587a5 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/tracing/Tracer.java +++ b/server/src/main/java/org/elasticsearch/telemetry/tracing/Tracer.java @@ -139,6 +139,11 @@ public interface Tracer { */ Releasable withScope(Traceable traceable); + /** + * @return {@code if service is enabled} + */ + boolean isEnabled(); + /** * A Tracer implementation that does nothing. This is used when no tracer is configured, * in order to avoid null checks everywhere. @@ -178,6 +183,11 @@ public void setAttribute(Traceable traceable, String key, String value) {} public Releasable withScope(Traceable traceable) { return () -> {}; } + + @Override + public boolean isEnabled() { + return false; + } }; interface AttributeKeys { diff --git a/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java b/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java new file mode 100644 index 0000000000000..12f05ceb95d6c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.tracing; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; +import java.util.function.Consumer; + +public class TracerSpan { + + private record Span(String spanId) implements Traceable { + + @Override + public String getSpanId() { + return spanId; + } + + public static Span create() { + return new Span(UUIDs.randomBase64UUID()); + } + } + + public static Releasable span(ThreadPool threadPool, Tracer tracer, String name) { + return span(threadPool, tracer, name, Map.of()); + } + + /** + * Creates a new span for a synchronous block of code. + * @return a span that needs to be released once block of code is completed + */ + public static Releasable span(ThreadPool threadPool, Tracer tracer, String name, Map attributes) { + if (tracer.isEnabled() == false) { + return () -> {}; + } + var span = Span.create(); + var ctx = threadPool.getThreadContext().newTraceContext(); + tracer.startTrace(threadPool.getThreadContext(), span, name, attributes); + + return () -> { + tracer.stopTrace(span); + ctx.restore(); + }; + } + + public static void span( + ThreadPool threadPool, + Tracer tracer, + String name, + ActionListener listener, + Consumer> action + ) { + var span = Span.create(); + try (var ctx = threadPool.getThreadContext().newTraceContext()) { + var context = threadPool.getThreadContext(); + tracer.startTrace(context, span, name, Map.of()); + action.accept( + ContextPreservingActionListener.wrapPreservingContext( + ActionListener.runAfter(listener, () -> tracer.stopTrace(span)), + context + ) + ); + } + } +} From ad3afc7636b9f1bb776427594b70b1ed6dd9ffd8 Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 10 Jun 2024 12:53:31 +0200 Subject: [PATCH 4/4] Add custom span to AggregationOperator --- .../common/util/concurrent/ThreadContext.java | 24 +++--- .../telemetry/tracing/TracerSpan.java | 86 +++++++++++++++++++ .../compute/operator/AggregationOperator.java | 25 +++++- .../compute/operator/Driver.java | 83 +++++++++++------- .../compute/operator/DriverTaskRunner.java | 7 +- .../compute/operator/Operator.java | 7 ++ .../xpack/esql/plugin/ComputeService.java | 9 +- .../esql/plugin/TransportEsqlQueryAction.java | 8 +- 8 files changed, 202 insertions(+), 47 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 7caf570806c0e..d65ab1b193cca 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -157,6 +157,10 @@ public StoredContext stashContextPreservingRequestHeaders(final String... reques return stashContextPreservingRequestHeaders(Set.of(requestHeaders)); } + public ThreadContextStruct getThreadContextStruct() { + return threadLocal.get(); + } + /** * When using a {@link org.elasticsearch.telemetry.tracing.Tracer} to capture activity in Elasticsearch, when a parent span is already * in progress, it is necessary to start a new context before beginning a child span. This method creates a context, @@ -705,7 +709,7 @@ public static Map buildDefaultHeaders(Settings settings) { } } - private static final class ThreadContextStruct { + public static final class ThreadContextStruct { private static final ThreadContextStruct EMPTY = new ThreadContextStruct( Collections.emptyMap(), @@ -714,12 +718,12 @@ private static final class ThreadContextStruct { false ); - private final Map requestHeaders; - private final Map transientHeaders; - private final Map> responseHeaders; - private final boolean isSystemContext; + public final Map requestHeaders; + public final Map transientHeaders; + public final Map> responseHeaders; + public final boolean isSystemContext; // saving current warning headers' size not to recalculate the size with every new warning header - private final long warningHeadersSize; + public final long warningHeadersSize; private ThreadContextStruct setSystemContext() { if (isSystemContext) { @@ -737,7 +741,7 @@ private ThreadContextStruct( this(requestHeaders, responseHeaders, transientHeaders, isSystemContext, 0L); } - private ThreadContextStruct( + public ThreadContextStruct( Map requestHeaders, Map> responseHeaders, Map transientHeaders, @@ -758,7 +762,7 @@ private ThreadContextStruct() { this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false); } - private ThreadContextStruct putRequest(String key, String value) { + public ThreadContextStruct putRequest(String key, String value) { Map newRequestHeaders = new HashMap<>(this.requestHeaders); putSingleHeader(key, value, newRequestHeaders); return new ThreadContextStruct(newRequestHeaders, responseHeaders, transientHeaders, isSystemContext); @@ -770,7 +774,7 @@ private static void putSingleHeader(String key, T value, Map newH } } - private ThreadContextStruct putHeaders(Map headers) { + public ThreadContextStruct putHeaders(Map headers) { if (headers.isEmpty()) { return this; } else { @@ -870,7 +874,7 @@ private ThreadContextStruct putResponse( return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize); } - private ThreadContextStruct putTransient(String key, Object value) { + public ThreadContextStruct putTransient(String key, Object value) { Map newTransient = new HashMap<>(this.transientHeaders); putSingleHeader(key, value, newTransient); return new ThreadContextStruct(requestHeaders, responseHeaders, newTransient, isSystemContext); diff --git a/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java b/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java index 12f05ceb95d6c..f5a9366b06084 100644 --- a/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java +++ b/server/src/main/java/org/elasticsearch/telemetry/tracing/TracerSpan.java @@ -11,9 +11,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; +import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; @@ -53,6 +56,89 @@ public static Releasable span(ThreadPool threadPool, Tracer tracer, String name, }; } + private static class SubThreadContext implements TraceContext { + ThreadContext.ThreadContextStruct ctx; + + SubThreadContext(ThreadContext parent) { + ThreadContext.ThreadContextStruct ctx = parent.getThreadContextStruct(); + final Map newRequestHeaders = new HashMap<>(ctx.requestHeaders); + final Map newTransientHeaders = new HashMap<>(ctx.transientHeaders); + + final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER); + if (previousTraceParent != null) { + newTransientHeaders.put("parent_" + Task.TRACE_PARENT_HTTP_HEADER, previousTraceParent); + } + + final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE); + if (previousTraceState != null) { + newTransientHeaders.put("parent_" + Task.TRACE_STATE, previousTraceState); + } + + final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT); + if (previousTraceContext != null) { + newTransientHeaders.put("parent_" + Task.APM_TRACE_CONTEXT, previousTraceContext); + } + + // this is the context when this method returns + ThreadContext.ThreadContextStruct newContext = new ThreadContext.ThreadContextStruct( + newRequestHeaders, + ctx.responseHeaders, + newTransientHeaders, + ctx.isSystemContext, + ctx.warningHeadersSize + ); + + this.ctx = newContext; + } + + /** + * Puts all of the given headers into this context + */ + @Override + public void putHeader(String key, String value) { + ctx.putRequest(key, value); + } + + + @Override + public String getHeader(String key) { + return ctx.requestHeaders.get(key); + } + + /** + * Puts a transient header object into this context + */ + @Override + public void putTransient(String key, Object value) { + ctx.putTransient(key, value); + } + + /** + * Returns a transient header object or null if there is no header for the given key + */ + @SuppressWarnings("unchecked") // (T)object + public T getTransient(String key) { + return (T) ctx.transientHeaders.get(key); + } + } + + public static Releasable sameThreadContextSpan(ThreadPool threadPool, Tracer tracer, String name) { + return sameThreadContextSpan(threadPool, tracer, name, Map.of()); + } + + public static Releasable sameThreadContextSpan(ThreadPool threadPool, Tracer tracer, String name, Map attributes) { + if (tracer.isEnabled() == false) { + return () -> {}; + } + var span = Span.create(); + SubThreadContext ctx = new SubThreadContext(threadPool.getThreadContext()); + tracer.startTrace(ctx, span, name, attributes); + + return () -> { + tracer.stopTrace(span); + }; + } + public static void span( ThreadPool threadPool, Tracer tracer, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java index 20d3f0166f1cb..a97369f70612b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java @@ -18,8 +18,12 @@ import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.telemetry.tracing.TracerSpan; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -39,11 +43,23 @@ * been added, that is, when the {@link #finish} method has been called. */ public class AggregationOperator implements Operator { - + private ThreadPool threadPool; + private Tracer tracer; private boolean finished; private Page output; private final List aggregators; private final DriverContext driverContext; + private Releasable span; + + @Override + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public void setTracer(Tracer tracer) { + this.tracer = tracer; + } /** * Nanoseconds this operator has spent running the aggregations. @@ -90,6 +106,9 @@ public boolean needsInput() { @Override public void addInput(Page page) { + if (span == null) { + span = TracerSpan.sameThreadContextSpan(threadPool, tracer, "AggregationOperator"); + } long start = System.nanoTime(); checkState(needsInput(), "Operator is already finishing"); requireNonNull(page, "page is null"); @@ -135,6 +154,8 @@ public void finish() { if (success == false && blocks != null) { Releasables.closeExpectNoException(blocks); } + span.close(); + span = null; } } @@ -149,7 +170,7 @@ public void close() { if (output != null) { Releasables.closeExpectNoException(() -> output.releaseBlocks()); } - }, Releasables.wrap(aggregators)); + }, Releasables.wrap(aggregators), span); } private static void checkState(boolean condition, String msg) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 785db826aadd6..bf5fcc3a5e4be 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -19,6 +19,9 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.telemetry.tracing.TracerSpan; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Iterator; @@ -50,6 +53,16 @@ public class Driver implements Releasable, Describable { public static final TimeValue DEFAULT_STATUS_INTERVAL = TimeValue.timeValueSeconds(1); private final String sessionId; + private ThreadPool threadPool; + private Tracer tracer; + + public void setTracer(Tracer tracer) { + this.tracer = tracer; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } /** * The wall clock time when this driver was created in milliseconds since epoch. @@ -170,36 +183,48 @@ public DriverContext driverContext() { * thread to do other work instead of blocking or busy-spinning on the blocked operator. */ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) { - long maxTimeNanos = maxTime.nanos(); - long startTime = nowSupplier.getAsLong(); - long nextStatus = startTime + statusNanos; - int iter = 0; - while (true) { - SubscribableListener fut = runSingleLoopIteration(); - iter++; - if (fut.isDone() == false) { - updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC); - return fut; - } - if (isFinished()) { - finishNanos = nowSupplier.getAsLong(); - updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE); - driverContext.finish(); - Releasables.close(releasable, driverContext.getSnapshot()); - return Operator.NOT_BLOCKED; - } - long now = nowSupplier.getAsLong(); - if (iter >= maxIterations) { - updateStatus(now - startTime, iter, DriverStatus.Status.WAITING); - return Operator.NOT_BLOCKED; + try ( + var span = TracerSpan.span( + threadPool, + tracer, + "Driver.run" + ) + ) { + long maxTimeNanos = maxTime.nanos(); + long startTime = nowSupplier.getAsLong(); + long nextStatus = startTime + statusNanos; + int iter = 0; + for (Operator op: activeOperators) { + op.setThreadPool(threadPool); + op.setTracer(tracer); } - if (now - startTime >= maxTimeNanos) { - updateStatus(now - startTime, iter, DriverStatus.Status.WAITING); - return Operator.NOT_BLOCKED; - } - if (now > nextStatus) { - updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING); - nextStatus = now + statusNanos; + while (true) { + SubscribableListener fut = runSingleLoopIteration(); + iter++; + if (fut.isDone() == false) { + updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC); + return fut; + } + if (isFinished()) { + finishNanos = nowSupplier.getAsLong(); + updateStatus(finishNanos - startTime, iter, DriverStatus.Status.DONE); + driverContext.finish(); + Releasables.close(releasable, driverContext.getSnapshot()); + return Operator.NOT_BLOCKED; + } + long now = nowSupplier.getAsLong(); + if (iter >= maxIterations) { + updateStatus(now - startTime, iter, DriverStatus.Status.WAITING); + return Operator.NOT_BLOCKED; + } + if (now - startTime >= maxTimeNanos) { + updateStatus(now - startTime, iter, DriverStatus.Status.WAITING); + return Operator.NOT_BLOCKED; + } + if (now > nextStatus) { + updateStatus(now - startTime, iter, DriverStatus.Status.RUNNING); + nextStatus = now + statusNanos; + } } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java index 38d879f8f7ad4..d2417113724eb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverTaskRunner.java @@ -17,6 +17,9 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.telemetry.tracing.TracerSpan; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; @@ -42,10 +45,12 @@ public DriverTaskRunner(TransportService transportService, Executor executor) { transportService.registerRequestHandler(ACTION_NAME, executor, DriverRequest::new, new DriverRequestHandler(transportService)); } - public void executeDrivers(Task parentTask, List drivers, Executor executor, ActionListener listener) { + public void executeDrivers(Task parentTask, List drivers, Executor executor, ActionListener listener, Tracer tracer) { var runner = new DriverRunner(transportService.getThreadPool().getThreadContext()) { @Override protected void start(Driver driver, ActionListener driverListener) { + driver.setTracer(tracer); + driver.setThreadPool(transportService.getThreadPool()); transportService.sendChildRequest( transportService.getLocalNode(), ACTION_NAME, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java index 1038277c39fe1..1287c10e88c14 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Operator.java @@ -8,12 +8,17 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.VersionedNamedWriteable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasable; +import org.elasticsearch.telemetry.tracing.Traceable; +import org.elasticsearch.telemetry.tracing.Tracer; +import org.elasticsearch.telemetry.tracing.TracerSpan; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.ToXContentObject; /** @@ -27,6 +32,8 @@ * {@link org.elasticsearch.compute} */ public interface Operator extends Releasable { + default void setTracer(Tracer tracer) {} + default void setThreadPool(ThreadPool threadPool) {} /** * Target number of bytes in a page. By default we'll try and size pages * so that they contain this many bytes. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4ebc4af258134..da4ed58fdcdc1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -53,6 +53,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; @@ -102,6 +103,7 @@ public record Result(List pages, List profiles) {} private final ExchangeService exchangeService; private final EnrichLookupService enrichLookupService; private final ClusterService clusterService; + private final Tracer tracer; public ComputeService( SearchService searchService, @@ -111,7 +113,8 @@ public ComputeService( ClusterService clusterService, ThreadPool threadPool, BigArrays bigArrays, - BlockFactory blockFactory + BlockFactory blockFactory, + Tracer tracer ) { this.searchService = searchService; this.transportService = transportService; @@ -129,6 +132,7 @@ public ComputeService( this.exchangeService = exchangeService; this.enrichLookupService = enrichLookupService; this.clusterService = clusterService; + this.tracer = tracer; } public void execute( @@ -454,7 +458,8 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, task, drivers, transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME), - listenerCollectingStatus + listenerCollectingStatus, + tracer ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 28191a394e69c..b75b6ac73a8fe 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackPlugin; @@ -75,8 +76,8 @@ public TransportEsqlQueryAction( BigArrays bigArrays, BlockFactory blockFactory, Client client, - NamedWriteableRegistry registry - + NamedWriteableRegistry registry, + Tracer tracer ) { // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -95,7 +96,8 @@ public TransportEsqlQueryAction( clusterService, threadPool, bigArrays, - blockFactory + blockFactory, + tracer ); this.asyncTaskManagementService = new AsyncTaskManagementService<>( XPackPlugin.ASYNC_RESULTS_INDEX,