Skip to content

Commit

Permalink
JVMCBC-1314: Pathfind OpenTelemetry integration with CNG
Browse files Browse the repository at this point in the history
This adds the GRPC automatic instrumentation from package
opentelemetry-grpc-1.6, which will automatically create
spans such as `couchbase.kv.v1.KvService/Upsert` underneath
dispatch_to_server.

Streaming services aren't touched, as they don't support
OpenTelemetry right now anyway.  (JVMCBC-1321)

Change-Id: I9ab70dadb94fab678a8f616aa32f6429aa911fbd
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/192537
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: David Nault <david.nault@couchbase.com>
Reviewed-by: Michael Reiche <michael.reiche@couchbase.com>
  • Loading branch information
programmatix committed Jul 21, 2023
1 parent 987c3d5 commit 37a82f5
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 24 deletions.
9 changes: 9 additions & 0 deletions core-io-deps/pom.xml
Expand Up @@ -265,6 +265,15 @@
<pattern>io.perfmark</pattern>
<shadedPattern>${shaded.package.prefix}io.perfmark</shadedPattern>
</relocation>

<!-- For tracing-opentelemetry, we need to combine the user's unshaded OpenTelemetry object, with a shaded
ManagedChannelBuilder, via the GRPC instrumentation library. The only solution that works is to
a) include the instrumentation library in core-io-deps so it can be shaded, and b) shade
only these - so that the shaded instrumentation library continues to depend on unshaded io.opentelemetry.OpenTelemetry -->
<relocation>
<pattern>io.opentelemetry.instrumentation.grpc</pattern>
<shadedPattern>${shaded.package.prefix}io.opentelemetry.instrumentation.grpc</shadedPattern>
</relocation>
</relocations>
<transformers>
<!-- Required for gRPC -->
Expand Down
Expand Up @@ -47,6 +47,7 @@
import com.couchbase.client.core.error.SecurityException;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.protostellar.GrpcAwareRequestTracer;
import com.couchbase.client.core.protostellar.ProtostellarContext;
import com.couchbase.client.core.protostellar.ProtostellarStatsCollector;
import com.couchbase.client.core.util.Deadline;
Expand Down Expand Up @@ -228,6 +229,11 @@ private ManagedChannel channel(ProtostellarContext ctx) {
// Retry strategies to be determined, but presumably we will need something custom rather than what GRPC provides
.disableRetry();

if (ctx.environment().requestTracer() != null
&& ctx.environment().requestTracer() instanceof GrpcAwareRequestTracer) {
((GrpcAwareRequestTracer) ctx.environment().requestTracer()).registerGrpc(builder);
}

// JVMCBC-1187: experimental code for performance testing that will be removed pre-GA.
// Testing anyway indicates this load balancing makes zero difference - always end up with one channel and one subchannel per ManagedChannel regardless.
String loadBalancingCount = System.getProperty("com.couchbase.protostellar.loadBalancing");
Expand Down
Expand Up @@ -33,12 +33,14 @@
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import static com.couchbase.client.core.protostellar.CoreProtostellarUtil.handleShutdownAsync;
import static com.couchbase.client.core.protostellar.CoreProtostellarUtil.handleShutdownBlocking;
import static com.couchbase.client.core.protostellar.CoreProtostellarUtil.handleShutdownReactive;
import static com.couchbase.client.core.util.ProtostellarUtil.activateSpan;

/**
* Used to generically handle the core functionality of sending a GRPC request over Protostellar and handling the response.
Expand Down Expand Up @@ -75,21 +77,22 @@ TSdkResult blocking(CoreProtostellar core,
ProtostellarEndpoint endpoint = core.endpoint();
long start = System.nanoTime();
RequestSpan dispatchSpan = createDispatchSpan(core, request, endpoint);

AutoCloseable scope = activateSpan(Optional.empty(), dispatchSpan, core.context().environment().requestTracer());

try {
// Make the Protostellar call.
TGrpcResponse response = executeBlockingGrpcCall.apply(endpoint);

request.dispatchDuration(System.nanoTime() - start);
if (dispatchSpan != null) {
dispatchSpan.end();
}
handleDispatchSpan(null, dispatchSpan, scope);
TSdkResult result = convertResponse.apply(response);
request.raisedResponseToUser(null);
return result;
} catch (Throwable t) {
request.dispatchDuration(System.nanoTime() - start);
ProtostellarRequestBehaviour behaviour = convertException.apply(t);
handleDispatchSpan(behaviour, dispatchSpan);
handleDispatchSpan(behaviour, dispatchSpan, scope);
if (behaviour.retryDuration() != null) {
try {
Thread.sleep(behaviour.retryDuration().toMillis());
Expand Down Expand Up @@ -143,16 +146,16 @@ void asyncInternal(CompletableFuture<TSdkResult> ret,
RequestSpan dispatchSpan = createDispatchSpan(core, request, endpoint);
long start = System.nanoTime();

AutoCloseable scope = activateSpan(Optional.empty(), dispatchSpan, core.context().environment().requestTracer());

// Make the Protostellar call.
ListenableFuture<TGrpcResponse> response = executeFutureGrpcCall.apply(endpoint);

Futures.addCallback(response, new FutureCallback<TGrpcResponse>() {
@Override
public void onSuccess(TGrpcResponse response) {
request.dispatchDuration(System.nanoTime() - start);
if (dispatchSpan != null) {
dispatchSpan.end();
}
handleDispatchSpan(null, dispatchSpan, scope);

TSdkResult result = convertResponse.apply(response);

Expand All @@ -169,7 +172,7 @@ public void onSuccess(TGrpcResponse response) {
public void onFailure(Throwable t) {
request.dispatchDuration(System.nanoTime() - start);
ProtostellarRequestBehaviour behaviour = convertException.apply(t);
handleDispatchSpan(behaviour, dispatchSpan);
handleDispatchSpan(behaviour, dispatchSpan, scope);
if (behaviour.retryDuration() != null) {
boolean unableToSchedule = core.context().environment().timer().schedule(() -> {
asyncInternal(ret, core, request, executeFutureGrpcCall, convertResponse, convertException);
Expand Down Expand Up @@ -241,6 +244,8 @@ void reactiveInternal(Sinks.One<TSdkResult> ret,
RequestSpan dispatchSpan = createDispatchSpan(core, request, endpoint);
long start = System.nanoTime();

AutoCloseable scope = activateSpan(Optional.empty(), dispatchSpan, core.context().environment().requestTracer());

// Make the Protostellar call.
ListenableFuture<TGrpcResponse> response = executeFutureGrpcCall.apply(endpoint);

Expand All @@ -252,9 +257,7 @@ public void onSuccess(TGrpcResponse response) {
}
else {
request.dispatchDuration(System.nanoTime() - start);
if (dispatchSpan != null) {
dispatchSpan.end();
}
handleDispatchSpan(null, dispatchSpan, scope);
TSdkResult result = convertResponse.apply(response);
request.raisedResponseToUser(null);
ret.tryEmitValue(result).orThrow();
Expand All @@ -265,7 +268,7 @@ public void onSuccess(TGrpcResponse response) {
public void onFailure(Throwable t) {
request.dispatchDuration(System.nanoTime() - start);
ProtostellarRequestBehaviour behaviour = convertException.apply(t);
handleDispatchSpan(behaviour, dispatchSpan);
handleDispatchSpan(behaviour, dispatchSpan, scope);
if (behaviour.retryDuration() != null) {
boolean unableToSchedule = core.context().environment().timer().schedule(() -> {
reactiveInternal(ret, core, request, executeFutureGrpcCall, convertResponse, convertException);
Expand All @@ -291,14 +294,24 @@ public void onFailure(Throwable t) {
}, core.context().environment().executor());
}

private static void handleDispatchSpan(ProtostellarRequestBehaviour behaviour, @Nullable RequestSpan dispatchSpan) {
private static void handleDispatchSpan(@Nullable ProtostellarRequestBehaviour behaviour, @Nullable RequestSpan dispatchSpan, @Nullable AutoCloseable scope) {
if (dispatchSpan != null) {
if (behaviour.exception() != null) {
dispatchSpan.recordException(behaviour.exception());
if (behaviour != null) {
dispatchSpan.status(RequestSpan.StatusCode.ERROR);
if (behaviour.exception() != null) {
dispatchSpan.recordException(behaviour.exception());
}
}
dispatchSpan.status(RequestSpan.StatusCode.ERROR);
dispatchSpan.end();
}
// Note that closing the scope doesn't end the span it owns, it just removes it from ThreadLocalStorage
if (scope != null) {
try {
scope.close();
} catch (Exception e) {
// Silently swallow - OTel should never throw anyway
}
}
}

private static <TGrpcRequest> @Nullable RequestSpan createDispatchSpan(CoreProtostellar core,
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.couchbase.client.core.protostellar;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.deps.io.grpc.ManagedChannelBuilder;

@Stability.Internal
public interface GrpcAwareRequestTracer {

/**
* Setup GRPC instrumentation on a given GRPC channel.
*/
void registerGrpc(ManagedChannelBuilder<?> builder);

/**
* Puts `span` into ThreadLocalStorage, ready to be picked up by libraries that rely on that mechanism and can't be passed the span explicitly.
* <p>
* We require this as asynchronous mechanisms such as reactive and CompletableFuture do not play well with ThreadLocalStorage.
*/
AutoCloseable activateSpan(RequestSpan span);
}
Expand Up @@ -16,6 +16,12 @@
package com.couchbase.client.core.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.protostellar.GrpcAwareRequestTracer;
import reactor.util.annotation.Nullable;

import java.util.Optional;

@Stability.Internal
// JVMCBC-1192: candidate for removal: will probably not be required when everything is moved to Core*Ops
Expand All @@ -32,4 +38,14 @@ public static com.couchbase.client.core.deps.com.google.protobuf.Duration conver
public static java.time.Duration convert(com.couchbase.client.core.deps.com.google.protobuf.Duration input) {
return java.time.Duration.ofSeconds(input.getSeconds(), input.getNanos());
}

public static @Nullable AutoCloseable activateSpan(Optional<RequestSpan> parentSpan, @Nullable RequestSpan span, @Nullable RequestTracer tracer) {
if (tracer != null && tracer instanceof GrpcAwareRequestTracer) {
if (span != null) {
return ((GrpcAwareRequestTracer) tracer).activateSpan(span);
}
}

return null;
}
}
3 changes: 3 additions & 0 deletions pom.xml
Expand Up @@ -57,6 +57,9 @@
<scala-maven-plugin.version>4.7.1</scala-maven-plugin.version>
<mvn-scalafmt.version>1.1.1640084764.9f463a9</mvn-scalafmt.version>

<!-- When changing this, make sure to update protostellar/pom.xml opentelemetry-grpc-1.6 version -->
<opentelemetry.version>1.19.0</opentelemetry.version>

<gpg.keyname></gpg.keyname>
<gpg.passphrase></gpg.passphrase>
<!-- Required to stop the exec-maven-plugin complaining -->
Expand Down
8 changes: 8 additions & 0 deletions protostellar/pom.xml
Expand Up @@ -56,6 +56,14 @@
<version>${grpc.version}</version>
</dependency>

<!-- See comment in core-io-deps/pom.xml for why this dependency is here -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-grpc-1.6</artifactId>
<!-- Older versions of Maven (certainly 3.6.3) cannot support "${opentelemetry.version}-alpha" -->
<version>1.19.0-alpha</version>
</dependency>

<!--Needed on Windows for annotations-->
<dependency>
<groupId>javax.annotation</groupId>
Expand Down
4 changes: 0 additions & 4 deletions tracing-opentelemetry/pom.xml
Expand Up @@ -15,10 +15,6 @@
<name>OpenTelemetry Interoperability</name>
<description>Provides interoperability with OpenTelemetry</description>

<properties>
<opentelemetry.version>1.19.0</opentelemetry.version>
</properties>

<dependencies>
<dependency>
<groupId>com.couchbase.client</groupId>
Expand Down
Expand Up @@ -18,8 +18,12 @@

import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.deps.io.grpc.ClientInterceptor;
import com.couchbase.client.core.deps.io.grpc.ManagedChannelBuilder;
import com.couchbase.client.core.deps.io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.TracerException;
import com.couchbase.client.core.protostellar.GrpcAwareRequestTracer;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
Expand All @@ -28,6 +32,7 @@
import io.opentelemetry.api.trace.TracerProvider;
import io.opentelemetry.context.Context;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

import java.net.URL;
import java.time.Duration;
Expand All @@ -42,7 +47,7 @@
* Wraps the OpenTelemetry tracer so it is suitable to be passed in into the couchbase environment and picked up
* by the rest of the SDK as a result.
*/
public class OpenTelemetryRequestTracer implements RequestTracer {
public class OpenTelemetryRequestTracer implements RequestTracer, GrpcAwareRequestTracer {

public static final String INSTRUMENTATION_NAME = "com.couchbase.client.jvm";

Expand Down Expand Up @@ -75,6 +80,7 @@ public class OpenTelemetryRequestTracer implements RequestTracer {
* Holds the actual OTel tracer.
*/
private final Tracer tracer;
private final @Nullable OpenTelemetry openTelemetry;

/**
* Wraps OpenTelemetry and returns a datatype that can be passed into the requestTracer method of the
Expand All @@ -84,7 +90,7 @@ public class OpenTelemetryRequestTracer implements RequestTracer {
* @return the wrapped OpenTelemetry ready to be passed in.
*/
public static OpenTelemetryRequestTracer wrap(final OpenTelemetry openTelemetry) {
return wrap(openTelemetry.getTracerProvider());
return new OpenTelemetryRequestTracer(openTelemetry.getTracerProvider(), openTelemetry);
}

/**
Expand All @@ -95,10 +101,10 @@ public static OpenTelemetryRequestTracer wrap(final OpenTelemetry openTelemetry)
* @return the wrapped OpenTelemetry ready to be passed in.
*/
public static OpenTelemetryRequestTracer wrap(final TracerProvider tracerProvider) {
return new OpenTelemetryRequestTracer(tracerProvider);
return new OpenTelemetryRequestTracer(tracerProvider, null);
}

private OpenTelemetryRequestTracer(TracerProvider tracerProvider) {
private OpenTelemetryRequestTracer(TracerProvider tracerProvider, @Nullable OpenTelemetry openTelemetry) {
String version = null;
try {
version = MANIFEST_INFOS.get("couchbase-java-tracing-opentelemetry").getValue("Impl-Version");
Expand All @@ -109,6 +115,7 @@ private OpenTelemetryRequestTracer(TracerProvider tracerProvider) {
this.tracer = version != null
? tracerProvider.get(INSTRUMENTATION_NAME, version)
: tracerProvider.get(INSTRUMENTATION_NAME);
this.openTelemetry = openTelemetry;
}

private Span castSpan(final RequestSpan requestSpan) {
Expand Down Expand Up @@ -158,4 +165,19 @@ public Mono<Void> stop(Duration timeout) {
return Mono.empty(); // Tracer should not be stopped by us
}

@Override
public void registerGrpc(ManagedChannelBuilder<?> builder) {
if (openTelemetry != null) {
GrpcTelemetry grpcTelemetry =
GrpcTelemetry.create(openTelemetry);
ClientInterceptor interceptor = grpcTelemetry.newClientInterceptor();

builder.intercept(interceptor);
}
}

@Override
public AutoCloseable activateSpan(RequestSpan span) {
return castSpan(span).makeCurrent();
}
}

0 comments on commit 37a82f5

Please sign in to comment.