Skip to content

Commit

Permalink
extend gRPC context propagation into WriteQueue, add queue timing t…
Browse files Browse the repository at this point in the history
…o `WriteQueue` commands (#7110)

* extend gRPC context propagation into WriteQueue, add queue timing to WriteQueue commands

* fixed forkedTest operation names
  • Loading branch information
richardstartin committed Jun 3, 2024
1 parent bffe40c commit f800fc6
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package datadog.trace.instrumentation.grpc;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.endTaskScope;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.startTaskScope;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimerHelper;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.nio.channels.Channel;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public final class QueuedCommandInstrumentation extends InstrumenterModule.Profiling
implements Instrumenter.ForKnownTypes {

private static final String QUEUED_COMMAND = "io.grpc.netty.WriteQueue$QueuedCommand";
private static final String STATE =
"datadog.trace.bootstrap.instrumentation.java.concurrent.State";

public QueuedCommandInstrumentation() {
super("grpc", "grpc-netty");
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct");
transformer.applyAdvice(
isMethod()
.and(
named("run")
.and(
takesArguments(1)
.and(takesArgument(0, named("io.netty.channel.Channel"))))),
getClass().getName() + "$Run");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(QUEUED_COMMAND, STATE);
}

@Override
public String[] knownMatchingTypes() {
return new String[] {
"io.grpc.netty.WriteQueue$AbstractQueuedCommand",
"io.grpc.netty.WriteQueue$RunnableCommand",
"io.grpc.netty.SendGrpcFrameCommand"
};
}

public static final class Construct {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(@Advice.This Object command) {
ContextStore<Object, State> contextStore = InstrumentationContext.get(QUEUED_COMMAND, STATE);
capture(contextStore, command, false);
QueueTimerHelper.startQueuingTimer(contextStore, Channel.class, command);
}
}

public static final class Run {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This Object command) {
return startTaskScope(InstrumentationContext.get(QUEUED_COMMAND, STATE), command);
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(@Advice.Enter AgentScope scope) {
endTaskScope(scope);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datadog.trace.instrumentation.grpc.client;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.grpc.internal.ClientStreamListener;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public final class AbstractClientStreamInstrumentation extends InstrumenterModule.Profiling
implements Instrumenter.ForSingleType {

public AbstractClientStreamInstrumentation() {
super("grpc", "grpc-client");
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
"io.grpc.internal.ClientStreamListener", AgentSpan.class.getName());
}

@Override
public String instrumentedType() {
return "io.grpc.internal.AbstractClientStream";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
named("start")
.and(
isMethod()
.and(
takesArgument(0, named("io.grpc.internal.ClientStreamListener"))
.and(takesArguments(1)))),
getClass().getName() + "$ActivateSpan");
}

public static final class ActivateSpan {
@Advice.OnMethodEnter
public static AgentScope before(@Advice.Argument(0) ClientStreamListener listener) {
AgentSpan span =
InstrumentationContext.get(ClientStreamListener.class, AgentSpan.class).get(listener);
if (null != span) {
return activateSpan(span);
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(@Advice.Enter AgentScope scope) {
if (null != scope) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,27 @@ class GrpcStreamingV1ForkedTest extends GrpcStreamingTest {
return "grpc.server.request"
}
}

class GrpcStreamingProfilingForkedTest extends GrpcStreamingTest {

@Override
protected void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.profiling.enabled", "true")
}

@Override
int version() {
return 1
}

@Override
protected String clientOperation() {
return "grpc.client.request"
}

@Override
protected String serverOperation() {
return "grpc.server.request"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,26 @@ class GrpcDataStreamsDisabledForkedTest extends GrpcTest {
return "grpc.server"
}
}

class GrpcProfilingForkedTest extends GrpcTest {
@Override
protected void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.profiling.enabled", "true")
}

@Override
int version() {
return 1
}

@Override
protected String clientOperation() {
return "grpc.client.request"
}

@Override
protected String serverOperation() {
return "grpc.server.request"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public boolean isEnabled() {
"val$r",
"io.grpc.Context$2",
"val$c",
"io.grpc.netty.WriteQueue$RunnableCommand",
"runnable",
"akka.dispatch.TaskInvocation",
"runnable",
"scala.concurrent.impl.CallbackRunnable",
Expand Down

0 comments on commit f800fc6

Please sign in to comment.