Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ testClusters.register("runTask") {
systemProperty 'ingest.geoip.downloader.enabled.default', 'true'
setting 'xpack.security.enabled', 'true'
keystore 'bootstrap.password', 'password'
setting 'telemetry.tracing.enabled', 'true'
setting 'telemetry.agent.server_url', 'INSERT_SERVER_URL_HERE'
keystore 'telemetry.api_key', 'INSERT_API_KEY_HERE'
user username: 'elastic-admin', password: 'elastic-password', role: '_es_test_root'
numberOfNodes = 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public void setLabelFilters(List<String> labelFilters) {
this.labelFilterAutomaton = buildAutomaton(labelFilters, List.of());
}

@Override
public boolean isEnabled() {
return enabled;
}

// package-private for testing
CharacterRunAutomaton getLabelFilterAutomaton() {
return labelFilterAutomaton;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public StoredContext stashContextPreservingRequestHeaders(final String... reques
return stashContextPreservingRequestHeaders(Set.of(requestHeaders));
}

public ThreadContextStruct getThreadContextStruct() {
return threadLocal.get();
}

/**
* When using a {@link org.elasticsearch.telemetry.tracing.Tracer} to capture activity in Elasticsearch, when a parent span is already
* in progress, it is necessary to start a new context before beginning a child span. This method creates a context,
Expand Down Expand Up @@ -705,7 +709,7 @@ public static Map<String, String> buildDefaultHeaders(Settings settings) {
}
}

private static final class ThreadContextStruct {
public static final class ThreadContextStruct {

private static final ThreadContextStruct EMPTY = new ThreadContextStruct(
Collections.emptyMap(),
Expand All @@ -714,12 +718,12 @@ private static final class ThreadContextStruct {
false
);

private final Map<String, String> requestHeaders;
private final Map<String, Object> transientHeaders;
private final Map<String, Set<String>> responseHeaders;
private final boolean isSystemContext;
public final Map<String, String> requestHeaders;
public final Map<String, Object> transientHeaders;
public final Map<String, Set<String>> responseHeaders;
public final boolean isSystemContext;
// saving current warning headers' size not to recalculate the size with every new warning header
private final long warningHeadersSize;
public final long warningHeadersSize;

private ThreadContextStruct setSystemContext() {
if (isSystemContext) {
Expand All @@ -737,7 +741,7 @@ private ThreadContextStruct(
this(requestHeaders, responseHeaders, transientHeaders, isSystemContext, 0L);
}

private ThreadContextStruct(
public ThreadContextStruct(
Map<String, String> requestHeaders,
Map<String, Set<String>> responseHeaders,
Map<String, Object> transientHeaders,
Expand All @@ -758,7 +762,7 @@ private ThreadContextStruct() {
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
}

private ThreadContextStruct putRequest(String key, String value) {
public ThreadContextStruct putRequest(String key, String value) {
Map<String, String> newRequestHeaders = new HashMap<>(this.requestHeaders);
putSingleHeader(key, value, newRequestHeaders);
return new ThreadContextStruct(newRequestHeaders, responseHeaders, transientHeaders, isSystemContext);
Expand All @@ -770,7 +774,7 @@ private static <T> void putSingleHeader(String key, T value, Map<String, T> newH
}
}

private ThreadContextStruct putHeaders(Map<String, String> headers) {
public ThreadContextStruct putHeaders(Map<String, String> headers) {
if (headers.isEmpty()) {
return this;
} else {
Expand Down Expand Up @@ -870,7 +874,7 @@ private ThreadContextStruct putResponse(
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
}

private ThreadContextStruct putTransient(String key, Object value) {
public ThreadContextStruct putTransient(String key, Object value) {
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
putSingleHeader(key, value, newTransient);
return new ThreadContextStruct(requestHeaders, responseHeaders, newTransient, isSystemContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public interface Tracer {
*/
Releasable withScope(Traceable traceable);

/**
* @return {@code if service is enabled}
*/
boolean isEnabled();

/**
* A Tracer implementation that does nothing. This is used when no tracer is configured,
* in order to avoid null checks everywhere.
Expand Down Expand Up @@ -178,6 +183,11 @@ public void setAttribute(Traceable traceable, String key, String value) {}
public Releasable withScope(Traceable traceable) {
return () -> {};
}

@Override
public boolean isEnabled() {
return false;
}
};

interface AttributeKeys {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.telemetry.tracing;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

public class TracerSpan {

private record Span(String spanId) implements Traceable {

@Override
public String getSpanId() {
return spanId;
}

public static Span create() {
return new Span(UUIDs.randomBase64UUID());
}
}

public static Releasable span(ThreadPool threadPool, Tracer tracer, String name) {
return span(threadPool, tracer, name, Map.of());
}

/**
* Creates a new span for a synchronous block of code.
* @return a span that needs to be released once block of code is completed
*/
public static Releasable span(ThreadPool threadPool, Tracer tracer, String name, Map<String, Object> attributes) {
if (tracer.isEnabled() == false) {
return () -> {};
}
var span = Span.create();
var ctx = threadPool.getThreadContext().newTraceContext();
tracer.startTrace(threadPool.getThreadContext(), span, name, attributes);

return () -> {
tracer.stopTrace(span);
ctx.restore();
};
}

private static class SubThreadContext implements TraceContext {
ThreadContext.ThreadContextStruct ctx;

SubThreadContext(ThreadContext parent) {
ThreadContext.ThreadContextStruct ctx = parent.getThreadContextStruct();
final Map<String, String> newRequestHeaders = new HashMap<>(ctx.requestHeaders);
final Map<String, Object> newTransientHeaders = new HashMap<>(ctx.transientHeaders);

final String previousTraceParent = newRequestHeaders.remove(Task.TRACE_PARENT_HTTP_HEADER);
if (previousTraceParent != null) {
newTransientHeaders.put("parent_" + Task.TRACE_PARENT_HTTP_HEADER, previousTraceParent);
}

final String previousTraceState = newRequestHeaders.remove(Task.TRACE_STATE);
if (previousTraceState != null) {
newTransientHeaders.put("parent_" + Task.TRACE_STATE, previousTraceState);
}

final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
if (previousTraceContext != null) {
newTransientHeaders.put("parent_" + Task.APM_TRACE_CONTEXT, previousTraceContext);
}

// this is the context when this method returns
ThreadContext.ThreadContextStruct newContext = new ThreadContext.ThreadContextStruct(
newRequestHeaders,
ctx.responseHeaders,
newTransientHeaders,
ctx.isSystemContext,
ctx.warningHeadersSize
);

this.ctx = newContext;
}

/**
* Puts all of the given headers into this context
*/
@Override
public void putHeader(String key, String value) {
ctx.putRequest(key, value);
}


@Override
public String getHeader(String key) {
return ctx.requestHeaders.get(key);
}

/**
* Puts a transient header object into this context
*/
@Override
public void putTransient(String key, Object value) {
ctx.putTransient(key, value);
}

/**
* Returns a transient header object or <code>null</code> if there is no header for the given key
*/
@SuppressWarnings("unchecked") // (T)object
public <T> T getTransient(String key) {
return (T) ctx.transientHeaders.get(key);
}
}

public static Releasable sameThreadContextSpan(ThreadPool threadPool, Tracer tracer, String name) {
return sameThreadContextSpan(threadPool, tracer, name, Map.of());
}

public static Releasable sameThreadContextSpan(ThreadPool threadPool, Tracer tracer, String name, Map<String, Object> attributes) {
if (tracer.isEnabled() == false) {
return () -> {};
}
var span = Span.create();
SubThreadContext ctx = new SubThreadContext(threadPool.getThreadContext());
tracer.startTrace(ctx, span, name, attributes);

return () -> {
tracer.stopTrace(span);
};
}

public static <T> void span(
ThreadPool threadPool,
Tracer tracer,
String name,
ActionListener<T> listener,
Consumer<ActionListener<T>> action
) {
var span = Span.create();
try (var ctx = threadPool.getThreadContext().newTraceContext()) {
var context = threadPool.getThreadContext();
tracer.startTrace(context, span, name, Map.of());
action.accept(
ContextPreservingActionListener.wrapPreservingContext(
ActionListener.runAfter(listener, () -> tracer.stopTrace(span)),
context
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.telemetry.tracing.TracerSpan;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -39,11 +43,23 @@
* been added, that is, when the {@link #finish} method has been called.
*/
public class AggregationOperator implements Operator {

private ThreadPool threadPool;
private Tracer tracer;
private boolean finished;
private Page output;
private final List<Aggregator> aggregators;
private final DriverContext driverContext;
private Releasable span;

@Override
public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public void setTracer(Tracer tracer) {
this.tracer = tracer;
}

/**
* Nanoseconds this operator has spent running the aggregations.
Expand Down Expand Up @@ -90,6 +106,9 @@ public boolean needsInput() {

@Override
public void addInput(Page page) {
if (span == null) {
span = TracerSpan.sameThreadContextSpan(threadPool, tracer, "AggregationOperator");
}
long start = System.nanoTime();
checkState(needsInput(), "Operator is already finishing");
requireNonNull(page, "page is null");
Expand Down Expand Up @@ -135,6 +154,8 @@ public void finish() {
if (success == false && blocks != null) {
Releasables.closeExpectNoException(blocks);
}
span.close();
span = null;
}
}

Expand All @@ -149,7 +170,7 @@ public void close() {
if (output != null) {
Releasables.closeExpectNoException(() -> output.releaseBlocks());
}
}, Releasables.wrap(aggregators));
}, Releasables.wrap(aggregators), span);
}

private static void checkState(boolean condition, String msg) {
Expand Down
Loading