From f800fc6ada1102882c7c6907326d80165ef7fe32 Mon Sep 17 00:00:00 2001 From: Richard Startin Date: Mon, 3 Jun 2024 19:01:20 +0100 Subject: [PATCH] extend gRPC context propagation into `WriteQueue`, add queue timing to `WriteQueue` commands (#7110) * extend gRPC context propagation into WriteQueue, add queue timing to WriteQueue commands * fixed forkedTest operation names --- .../grpc/QueuedCommandInstrumentation.java | 84 +++++++++++++++++++ .../AbstractClientStreamInstrumentation.java | 69 +++++++++++++++ .../src/test/groovy/GrpcStreamingTest.groovy | 24 ++++++ .../grpc-1.5/src/test/groovy/GrpcTest.groovy | 23 +++++ .../TaskUnwrappingInstrumentation.java | 2 + 5 files changed, 202 insertions(+) create mode 100644 dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java create mode 100644 dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/AbstractClientStreamInstrumentation.java diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java new file mode 100644 index 00000000000..c446ed883c6 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/QueuedCommandInstrumentation.java @@ -0,0 +1,84 @@ +package datadog.trace.instrumentation.grpc; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture; +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope; +import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper; +import datadog.trace.bootstrap.instrumentation.java.concurrent.State; +import java.nio.channels.Channel; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class QueuedCommandInstrumentation extends InstrumenterModule.Profiling + implements Instrumenter.ForKnownTypes { + + private static final String QUEUED_COMMAND = "io.grpc.netty.WriteQueue$QueuedCommand"; + private static final String STATE = + "datadog.trace.bootstrap.instrumentation.java.concurrent.State"; + + public QueuedCommandInstrumentation() { + super("grpc", "grpc-netty"); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + transformer.applyAdvice( + isMethod() + .and( + named("run") + .and( + takesArguments(1) + .and(takesArgument(0, named("io.netty.channel.Channel"))))), + getClass().getName() + "$Run"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap(QUEUED_COMMAND, STATE); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "io.grpc.netty.WriteQueue$AbstractQueuedCommand", + "io.grpc.netty.WriteQueue$RunnableCommand", + "io.grpc.netty.SendGrpcFrameCommand" + }; + } + + public static final class Construct { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void after(@Advice.This Object command) { + ContextStore contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE); + capture(contextStore, command, false); + QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command); + } + } + + public static final class Run { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope before(@Advice.This Object command) { + return startTaskScope(InstrumentationContext.get(QUEUED_COMMAND, STATE), command); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class) + public static void after(@Advice.Enter AgentScope scope) { + endTaskScope(scope); + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/AbstractClientStreamInstrumentation.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/AbstractClientStreamInstrumentation.java new file mode 100644 index 00000000000..869385b1330 --- /dev/null +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/client/AbstractClientStreamInstrumentation.java @@ -0,0 +1,69 @@ +package datadog.trace.instrumentation.grpc.client; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.grpc.internal.ClientStreamListener; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public final class AbstractClientStreamInstrumentation extends InstrumenterModule.Profiling + implements Instrumenter.ForSingleType { + + public AbstractClientStreamInstrumentation() { + super("grpc", "grpc-client"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + "io.grpc.internal.ClientStreamListener", AgentSpan.class.getName()); + } + + @Override + public String instrumentedType() { + return "io.grpc.internal.AbstractClientStream"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + named("start") + .and( + isMethod() + .and( + takesArgument(0, named("io.grpc.internal.ClientStreamListener")) + .and(takesArguments(1)))), + getClass().getName() + "$ActivateSpan"); + } + + public static final class ActivateSpan { + @Advice.OnMethodEnter + public static AgentScope before(@Advice.Argument(0) ClientStreamListener listener) { + AgentSpan span = + InstrumentationContext.get(ClientStreamListener.class, AgentSpan.class).get(listener); + if (null != span) { + return activateSpan(span); + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class) + public static void after(@Advice.Enter AgentScope scope) { + if (null != scope) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy index 8517f9efc8d..a3efcb9266a 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcStreamingTest.groovy @@ -265,3 +265,27 @@ class GrpcStreamingV1ForkedTest extends GrpcStreamingTest { return "grpc.server.request" } } + +class GrpcStreamingProfilingForkedTest extends GrpcStreamingTest { + + @Override + protected void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.profiling.enabled", "true") + } + + @Override + int version() { + return 1 + } + + @Override + protected String clientOperation() { + return "grpc.client.request" + } + + @Override + protected String serverOperation() { + return "grpc.server.request" + } +} diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy index c8ee881f940..7a5dac37102 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -692,3 +692,26 @@ class GrpcDataStreamsDisabledForkedTest extends GrpcTest { return "grpc.server" } } + +class GrpcProfilingForkedTest extends GrpcTest { + @Override + protected void configurePreAgent() { + super.configurePreAgent() + injectSysConfig("dd.profiling.enabled", "true") + } + + @Override + int version() { + return 1 + } + + @Override + protected String clientOperation() { + return "grpc.client.request" + } + + @Override + protected String serverOperation() { + return "grpc.server.request" + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/TaskUnwrappingInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/TaskUnwrappingInstrumentation.java index 840a835d5ac..c5d8bf50b98 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/TaskUnwrappingInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/TaskUnwrappingInstrumentation.java @@ -56,6 +56,8 @@ public boolean isEnabled() { "val$r", "io.grpc.Context$2", "val$c", + "io.grpc.netty.WriteQueue$RunnableCommand", + "runnable", "akka.dispatch.TaskInvocation", "runnable", "scala.concurrent.impl.CallbackRunnable",