Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump co.elastic.clients:elasticsearch-java from 8.8.1 to 8.9.0 #3283

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,7 +38,7 @@ public interface DetachedThreadLocal<T> {
@Nullable
T getAndRemove();

void set(T value);
void set(@Nullable T value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] this annotation was already present in interface implementation.


void remove();

Expand Down
Expand Up @@ -66,7 +66,7 @@ public static class ElasticsearchRestClientAsyncAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object[] onBeforeExecute(@Advice.Argument(0) Request request,
@Advice.Argument(1) ResponseListener responseListener) {
Span<?> span = helper.createClientSpan(request, request.getMethod(), request.getEndpoint(), request.getEntity());
Span<?> span = helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity());
if (span != null) {
Object[] ret = new Object[2];
ret[0] = span;
Expand Down
Expand Up @@ -60,7 +60,7 @@ public static class ElasticsearchRestClientSyncAdvice {
@Nullable
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object onBeforeExecute(@Advice.Argument(0) Request request) {
return helper.createClientSpan(request, request.getMethod(), request.getEndpoint(), request.getEntity());
return helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity());
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
Expand Down
Expand Up @@ -11,7 +11,7 @@
<name>${project.groupId}:${project.artifactId}</name>

<properties>
<version.elasticsearch-java>8.8.1</version.elasticsearch-java>
<version.elasticsearch-java>8.9.0</version.elasticsearch-java>
<apm-agent-parent.base.dir>${project.basedir}/../../..</apm-agent-parent.base.dir>
</properties>

Expand Down
Expand Up @@ -21,35 +21,35 @@
import co.elastic.apm.agent.esrestclient.ElasticsearchRestClientInstrumentation;
import co.elastic.apm.agent.esrestclient.ElasticsearchRestClientInstrumentationHelper;
import co.elastic.clients.transport.Endpoint;
import co.elastic.clients.transport.TransportOptions;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.elasticsearch.client.Request;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static net.bytebuddy.matcher.ElementMatchers.*;

/**
* Instruments {@link co.elastic.clients.transport.rest_client.RestClientTransport#prepareLowLevelRequest(Object, Endpoint, TransportOptions)}.
* Instruments:
* <ul>
* <li>{@link co.elastic.clients.transport.rest_client.RestClientTransport#performRequest}</li>
* <li>{@link co.elastic.clients.transport.rest_client.RestClientTransport#performRequestAsync} </li>
* </ul>
* To store the current endpoint ID in a thread-local storage
*/
@SuppressWarnings("JavadocReference")
public class RestClientTransportInstrumentation extends ElasticsearchRestClientInstrumentation {

@Override
public ElementMatcher<? super TypeDescription> getTypeMatcher() {
return named("co.elastic.clients.transport.rest_client.RestClientTransport");
return named("co.elastic.clients.transport.rest_client.RestClientTransport") // 8.x up to 8.8.x
.or(named("co.elastic.clients.transport.ElasticsearchTransportBase")); // 8.9.0+
}

@Override
public ElementMatcher<? super MethodDescription> getMethodMatcher() {
return named("prepareLowLevelRequest")
return named("performRequest")
.or(named("performRequestAsync"))
.and(takesArguments(3))
.and(takesArgument(1, named("co.elastic.clients.transport.Endpoint")))
.and(returns(named("org.elasticsearch.client.Request")));
.and(takesArgument(1, named("co.elastic.clients.transport.Endpoint")));
}

@Override
Expand All @@ -61,10 +61,14 @@ public static class RestClientTransportAdvice {

private static final ElasticsearchRestClientInstrumentationHelper helper = ElasticsearchRestClientInstrumentationHelper.get();

@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] instead of keeping track of the request object (which is wrapped/replaced at least twice in the new implementation), we just use a thread-local value to store the endpoint ID for the duration of the performRequest* method execution, which is enough to create the span with the correct attributes.

public static void onEnter(@Advice.Argument(1) Endpoint<?, ?, ?> endpoint) {
helper.setCurrentEndpoint(endpoint.id());
}

@Advice.OnMethodExit(suppress = Throwable.class, inline = false)
public static void onPrepareLowLevelRequest(@Advice.Argument(1) Endpoint<?, ?, ?> endpoint, @Advice.Return Request request) {
helper.registerEndpointId(request, endpoint.id());
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
public static void onExit() {
helper.clearCurrentEndpoint();
}

}
Expand Down
Expand Up @@ -875,14 +875,16 @@ public final class ElasticsearchEndpointMap {
private ElasticsearchEndpointMap() {
}

private static void initEndpoint(
Map<String, ElasticsearchEndpointDefinition> map,
String endpointId,
boolean isSearchEndpoint,
String... routes) {
ElasticsearchEndpointDefinition endpointDef =
new ElasticsearchEndpointDefinition(endpointId, routes, isSearchEndpoint);
private static void initEndpoint(Map<String, ElasticsearchEndpointDefinition> map,
String endpointId,
boolean isSearchEndpoint,
String... routes) {

ElasticsearchEndpointDefinition endpointDef = new ElasticsearchEndpointDefinition(endpointId, routes, isSearchEndpoint);
map.put(endpointId, endpointDef);

// duplicate entries for allocation-free lookup with 'es/' prefix.
map.put("es/" + endpointId, endpointDef);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[for reviewer] this allows to make the lookup allocation free (endpoint ID was modified before lookup in inregisterEndpointId).

}

@Nullable
Expand Down
Expand Up @@ -19,19 +19,19 @@
package co.elastic.apm.agent.esrestclient;

import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.sdk.internal.util.IOUtils;
import co.elastic.apm.agent.sdk.internal.util.LoggerUtils;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.weakconcurrent.DetachedThreadLocal;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.pooling.ObjectPool;
import co.elastic.apm.agent.sdk.internal.util.IOUtils;
import co.elastic.apm.agent.sdk.internal.util.LoggerUtils;
import co.elastic.apm.agent.tracer.pooling.Allocator;
import co.elastic.apm.agent.tracer.pooling.ObjectPool;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Response;
Expand All @@ -43,7 +43,8 @@

public class ElasticsearchRestClientInstrumentationHelper {

private static final WeakMap<Object, ElasticsearchEndpointDefinition> requestEndpointMap = WeakConcurrent.buildMap();
private static final DetachedThreadLocal<ElasticsearchEndpointDefinition> currentRequestEndpoint = WeakConcurrent.buildThreadLocal();

private static final Logger logger = LoggerFactory.getLogger(ElasticsearchRestClientInstrumentationHelper.class);

private static final Logger unsupportedOperationOnceLogger = LoggerUtils.logOnce(logger);
Expand Down Expand Up @@ -76,29 +77,18 @@ public ResponseListenerWrapper createInstance() {
}
}

public void registerEndpointId(Object requestObj, String endpointId) {
if (endpointId.startsWith("es/") && endpointId.length() > 3) {
endpointId = endpointId.substring(3);
}
ElasticsearchEndpointDefinition endpoint = ElasticsearchEndpointMap.get(endpointId);
if (endpoint != null) {
requestEndpointMap.put(requestObj, endpoint);
}
public void setCurrentEndpoint(String endpointId) {
currentRequestEndpoint.set(ElasticsearchEndpointMap.get(endpointId));
}

@Nullable
public Span<?> createClientSpan(Object requestObj, String method, String httpPath, @Nullable HttpEntity httpEntity) {
ElasticsearchEndpointDefinition endpoint = requestEndpointMap.remove(requestObj);
return createClientSpan(method, httpPath, httpEntity, endpoint);
public void clearCurrentEndpoint() {
currentRequestEndpoint.remove();
}

@Nullable
public Span<?> createClientSpan(String method, String httpPath, @Nullable HttpEntity httpEntity) {
return createClientSpan(method, httpPath, httpEntity, null);
}
ElasticsearchEndpointDefinition endpoint = currentRequestEndpoint.getAndRemove();

@Nullable
private Span<?> createClientSpan(String method, String httpPath, @Nullable HttpEntity httpEntity, @Nullable ElasticsearchEndpointDefinition endpoint) {
Span<?> span = tracer.currentContext().createExitSpan();

// Don't record nested spans. In 5.x clients the instrumented sync method is calling the instrumented async method
Expand Down