Skip to content

Commit

Permalink
Added ES client 8.x instrumentation (#3157)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Alexander Wert <AlexanderWert@users.noreply.github.com>
Co-authored-by: Jonas Kunz <jonas.kunz@elastic.co>
  • Loading branch information
AlexanderWert and JonasKunz committed Jul 26, 2023
1 parent 9bdb3aa commit 7d59455
Show file tree
Hide file tree
Showing 22 changed files with 1,822 additions and 326 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -35,6 +35,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
===== Features
* Added W3C baggage propagation - {pull}3236[#3236], {pull}3248[#3248]
* Added support for baggage in OpenTelemetry bridge - {pull}3249[#3249]
* Improved span naming and attribute collection for 7.16+ elasticsearch clients - {pull}3157[#3157]
[[release-notes-1.x]]
=== Java Agent version 1.x
Expand Down
5 changes: 5 additions & 0 deletions apm-agent-builds/pom.xml
Expand Up @@ -97,6 +97,11 @@
<artifactId>apm-es-restclient-plugin-7_x</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>apm-es-restclient-plugin-8_x</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>apm-grails-plugin</artifactId>
Expand Down

This file was deleted.

Expand Up @@ -19,8 +19,8 @@
package co.elastic.apm.agent.esrestclient.v5_6;

import co.elastic.apm.agent.esrestclient.AbstractEsClientInstrumentationTest;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.tracer.Outcome;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand Down Expand Up @@ -65,14 +65,13 @@
@RunWith(Parameterized.class)
public class ElasticsearchRestClientInstrumentationIT extends AbstractEsClientInstrumentationTest {

private static final String ELASTICSEARCH_CONTAINER_VERSION = "docker.elastic.co/elasticsearch/elasticsearch:5.6.0";
protected static final String USER_NAME = "elastic";
protected static final String PASSWORD = "changeme";

protected static final String DOC_TYPE = "doc";
private static RestHighLevelClient client;
private static final String ELASTICSEARCH_CONTAINER_VERSION = "docker.elastic.co/elasticsearch/elasticsearch:5.6.0";
@SuppressWarnings("NullableProblems")
protected static RestClient lowLevelClient;
private static RestHighLevelClient client;

public ElasticsearchRestClientInstrumentationIT(boolean async) {
this.async = async;
Expand All @@ -86,7 +85,7 @@ public static void startElasticsearchContainerAndClient() throws IOException {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(USER_NAME, PASSWORD));

RestClientBuilder builder = RestClient.builder(HttpHost.create(container.getHttpHostAddress()))
RestClientBuilder builder = RestClient.builder(HttpHost.create(container.getHttpHostAddress()))
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
lowLevelClient = builder.build();
client = new RestHighLevelClient(lowLevelClient);
Expand Down Expand Up @@ -124,14 +123,18 @@ public void testCreateAndDeleteIndex() throws IOException, ExecutionException {
// Create an Index
doPerformRequest("PUT", "/" + SECOND_INDEX);

validateSpanContentAfterIndexCreateRequest();
validateSpan()
.method("PUT").pathName("/%s", SECOND_INDEX)
.check();

// Delete the index
reporter.reset();

doPerformRequest("DELETE", "/" + SECOND_INDEX);

validateSpanContentAfterIndexDeleteRequest();
validateSpan()
.method("DELETE").pathName("/%s", SECOND_INDEX)
.check();

assertThat(reporter.getFirstSpan().getOutcome()).isEqualTo(Outcome.SUCCESS);
}
Expand All @@ -147,10 +150,10 @@ public void testDocumentScenario() throws IOException, ExecutionException, Inter
).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE));
assertThat(ir.status().getStatus()).isEqualTo(201);


List<Span> spans = reporter.getSpans();
assertThat(spans).hasSize(1);
validateSpanContent(spans.get(0), String.format("Elasticsearch: PUT /%s/%s/%s", INDEX, DOC_TYPE, DOC_ID), 201, "PUT");
validateSpan()
.method("PUT").pathName("/%s/%s/%s", INDEX, DOC_TYPE, DOC_ID)
.statusCode(201)
.check();

// Search the index
reporter.reset();
Expand All @@ -165,12 +168,10 @@ public void testDocumentScenario() throws IOException, ExecutionException, Inter
assertThat(sr.getHits().totalHits).isEqualTo(1L);
assertThat(sr.getHits().getAt(0).getSourceAsMap().get(FOO)).isEqualTo(BAR);


spans = reporter.getSpans();
assertThat(spans).hasSize(1);
Span searchSpan = spans.get(0);
validateSpanContent(searchSpan, String.format("Elasticsearch: GET /%s/_search", INDEX), 200, "GET");
validateDbContextContent(searchSpan, "{\"from\":0,\"size\":5,\"query\":{\"term\":{\"foo\":{\"value\":\"bar\",\"boost\":1.0}}}}");
validateSpan()
.method("GET").pathName("/%s/_search", INDEX)
.expectStatement("{\"from\":0,\"size\":5,\"query\":{\"term\":{\"foo\":{\"value\":\"bar\",\"boost\":1.0}}}}")
.check();

// Now update and re-search
reporter.reset();
Expand All @@ -184,11 +185,11 @@ public void testDocumentScenario() throws IOException, ExecutionException, Inter
assertThat(sr.getHits().getAt(0).getSourceAsMap().get(FOO)).isEqualTo(BAZ);


spans = reporter.getSpans();
List<Span> spans = reporter.getSpans();
assertThat(spans).hasSize(2);
boolean updateSpanFound = false;
for(Span span: spans) {
if(span.getNameAsString().contains("_update")) {
for (Span span : spans) {
if (span.getNameAsString().contains("_update")) {
updateSpanFound = true;
break;
}
Expand All @@ -198,7 +199,10 @@ public void testDocumentScenario() throws IOException, ExecutionException, Inter
// Finally - delete the document
reporter.reset();
DeleteResponse dr = doDelete(new DeleteRequest(INDEX, DOC_TYPE, DOC_ID));
validateSpanContent(spans.get(0), String.format("Elasticsearch: DELETE /%s/%s/%s", INDEX, DOC_TYPE, DOC_ID), 200, "DELETE");

validateSpan()
.method("DELETE").pathName("/%s/%s/%s", INDEX, DOC_TYPE, DOC_ID)
.check();
}

@Test
Expand All @@ -212,11 +216,10 @@ public void testScenarioAsBulkRequest() throws IOException, ExecutionException,
))
.add(new DeleteRequest(INDEX, DOC_TYPE, "2")));

validateSpanContentAfterBulkRequest();
}

private interface ClientMethod<Req, Res> {
void invoke(Req request, ActionListener<Res> listener);
validateSpan()
.method("POST")
.pathName("/_bulk")
.check();
}

private <Req, Res> Res invokeAsync(Req request, ClientMethod<Req, Res> method) throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -280,7 +283,6 @@ private BulkResponse doBulk(BulkRequest bulkRequest) throws IOException, Executi
return client.bulk(bulkRequest);
}


private Response doPerformRequest(String method, String path) throws IOException, ExecutionException {
if (async) {
final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
Expand All @@ -304,4 +306,9 @@ public void onFailure(Exception exception) {
return lowLevelClient.performRequest(method, path);
}


private interface ClientMethod<Req, Res> {
void invoke(Req request, ActionListener<Res> listener);
}

}
Expand Up @@ -66,7 +66,7 @@ public static class ElasticsearchRestClientAsyncAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object[] onBeforeExecute(@Advice.Argument(0) Request request,
@Advice.Argument(1) ResponseListener responseListener) {
Span<?> span = helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity());
Span<?> span = helper.createClientSpan(request, request.getMethod(), request.getEndpoint(), request.getEntity());
if (span != null) {
Object[] ret = new Object[2];
ret[0] = span;
Expand Down
Expand Up @@ -60,7 +60,7 @@ public static class ElasticsearchRestClientSyncAdvice {
@Nullable
@Advice.OnMethodEnter(suppress = Throwable.class, inline = false)
public static Object onBeforeExecute(@Advice.Argument(0) Request request) {
return helper.createClientSpan(request.getMethod(), request.getEndpoint(), request.getEntity());
return helper.createClientSpan(request, request.getMethod(), request.getEndpoint(), request.getEntity());
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
Expand Down

0 comments on commit 7d59455

Please sign in to comment.