diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java index 0129509a9c4..69f11d980c8 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java @@ -93,6 +93,8 @@ public interface KnownAddresses { Address>> HEADERS_NO_COOKIES = new Address<>("server.request.headers.no_cookies"); + Address GRPC_SERVER_METHOD = new Address<>("grpc.server.method"); + Address GRPC_SERVER_REQUEST_MESSAGE = new Address<>("grpc.server.request.message"); // XXX: Not really used yet, but it's a known address and we should not treat it as unknown. @@ -153,6 +155,8 @@ static Address forName(String name) { return REQUEST_QUERY; case "server.request.headers.no_cookies": return HEADERS_NO_COOKIES; + case "grpc.server.method": + return GRPC_SERVER_METHOD; case "grpc.server.request.message": return GRPC_SERVER_REQUEST_MESSAGE; case "grpc.server.request.metadata": diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java index 040f3e8527d..27273c388fb 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java @@ -79,6 +79,7 @@ public class GatewayBridge { private volatile DataSubscriberInfo requestBodySubInfo; private volatile DataSubscriberInfo pathParamsSubInfo; private volatile DataSubscriberInfo respDataSubInfo; + private volatile DataSubscriberInfo grpcServerMethodSubInfo; private volatile DataSubscriberInfo grpcServerRequestMsgSubInfo; private volatile DataSubscriberInfo graphqlServerRequestMsgSubInfo; private volatile DataSubscriberInfo requestEndSubInfo; @@ -359,6 +360,32 @@ public void init() { return maybePublishResponseData(ctx); }); + subscriptionService.registerCallback( + EVENTS.grpcServerMethod(), + (ctx_, method) -> { + AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return NoopFlow.INSTANCE; + } + while (true) { + DataSubscriberInfo subInfo = grpcServerMethodSubInfo; + if (subInfo == null) { + subInfo = producerService.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD); + grpcServerMethodSubInfo = subInfo; + } + if (subInfo == null || subInfo.isEmpty()) { + return NoopFlow.INSTANCE; + } + DataBundle bundle = + new SingletonDataBundle<>(KnownAddresses.GRPC_SERVER_METHOD, method); + try { + return producerService.publishDataEvent(subInfo, ctx, bundle, true); + } catch (ExpiredSubscriberInfoException e) { + grpcServerMethodSubInfo = null; + } + } + }); + subscriptionService.registerCallback( EVENTS.grpcServerRequestMessage(), (ctx_, obj) -> { diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy index 1b6ba5d3973..efe6cf02cf3 100644 --- a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy @@ -76,6 +76,7 @@ class GatewayBridgeSpecification extends DDSpecification { BiFunction> responseStartedCB TriConsumer respHeaderCB Function> respHeadersDoneCB + BiFunction> grpcServerMethodCB BiFunction> grpcServerRequestMessageCB BiFunction, Flow> graphqlServerRequestMessageCB @@ -410,6 +411,7 @@ class GatewayBridgeSpecification extends DDSpecification { 1 * ig.registerCallback(EVENTS.responseStarted(), _) >> { responseStartedCB = it[1]; null } 1 * ig.registerCallback(EVENTS.responseHeader(), _) >> { respHeaderCB = it[1]; null } 1 * ig.registerCallback(EVENTS.responseHeaderDone(), _) >> { respHeadersDoneCB = it[1]; null } + 1 * ig.registerCallback(EVENTS.grpcServerMethod(), _) >> { grpcServerMethodCB = it[1]; null } 1 * ig.registerCallback(EVENTS.grpcServerRequestMessage(), _) >> { grpcServerRequestMessageCB = it[1]; null } 1 * ig.registerCallback(EVENTS.graphqlServerRequestMessage(), _) >> { graphqlServerRequestMessageCB = it[1]; null } 0 * ig.registerCallback(_, _) @@ -705,6 +707,22 @@ class GatewayBridgeSpecification extends DDSpecification { flow.action == Flow.Action.Noop.INSTANCE } + void 'grpc server method publishes'() { + setup: + eventDispatcher.getDataSubscribers(KnownAddresses.GRPC_SERVER_METHOD) >> nonEmptyDsInfo + DataBundle bundle + + when: + Flow flow = grpcServerMethodCB.apply(ctx, '/my.package.Greeter/SayHello') + + then: + 1 * eventDispatcher.publishDataEvent(nonEmptyDsInfo, ctx.data, _ as DataBundle, true) >> + { args -> bundle = args[2]; NoopFlow.INSTANCE } + bundle.get(KnownAddresses.GRPC_SERVER_METHOD) == '/my.package.Greeter/SayHello' + flow.result == null + flow.action == Flow.Action.Noop.INSTANCE + } + void 'calls trace segment post processor'() { setup: AgentSpan span = Stub() diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java index 158c7c72d01..8e60fecaf30 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java @@ -27,6 +27,7 @@ import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; @@ -76,6 +77,7 @@ public ServerCall.Listener interceptCall( if (reqContext != null) { callIGCallbackClientAddress(cbp, reqContext, call); callIGCallbackHeaders(cbp, reqContext, headers); + callIGCallbackGrpcServerMethod(cbp, reqContext, call.getMethodDescriptor()); } DECORATE.afterStart(span); @@ -315,6 +317,16 @@ private static void callIGCallbackRequestEnded(@Nonnull final AgentSpan span) { } } + private static void callIGCallbackGrpcServerMethod( + CallbackProvider cbp, RequestContext ctx, MethodDescriptor methodDescriptor) { + String method = methodDescriptor.getFullMethodName(); + BiFunction> cb = cbp.getCallback(EVENTS.grpcServerMethod()); + if (method == null || cb == null) { + return; + } + cb.apply(ctx, method); + } + private static void callIGCallbackGrpcMessage(@Nonnull final AgentSpan span, Object obj) { if (obj == null) { return; diff --git a/dd-java-agent/instrumentation/armeria-grpc/src/test/groovy/ArmeriaGrpcTest.groovy b/dd-java-agent/instrumentation/armeria-grpc/src/test/groovy/ArmeriaGrpcTest.groovy index 4f93259bedb..9aa49c386d8 100644 --- a/dd-java-agent/instrumentation/armeria-grpc/src/test/groovy/ArmeriaGrpcTest.groovy +++ b/dd-java-agent/instrumentation/armeria-grpc/src/test/groovy/ArmeriaGrpcTest.groovy @@ -45,6 +45,7 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase { def collectedAppSecHeaders = [:] boolean appSecHeaderDone = false + def collectedAppSecServerMethods = [] def collectedAppSecReqMsgs = [] final Duration timeoutDuration() { @@ -97,6 +98,10 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase { collectedAppSecReqMsgs << obj Flow.ResultFlow.empty() } as BiFunction>) + ig.registerCallback(EVENTS.grpcServerMethod(), { reqCtx, method -> + collectedAppSecServerMethods << method + Flow.ResultFlow.empty() + } as BiFunction>) } def cleanup() { @@ -230,6 +235,8 @@ abstract class ArmeriaGrpcTest extends VersionedNamingTestBase { traceId.toLong() as String == collectedAppSecHeaders['x-datadog-trace-id'] collectedAppSecReqMsgs.size() == 1 collectedAppSecReqMsgs.first().name == name + collectedAppSecServerMethods.size() == 1 + collectedAppSecServerMethods.first() == 'example.Greeter/SayHello' and: if (isDataStreamsEnabled()) { diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java index a238d7ac5f6..276524a098a 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java +++ b/dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java @@ -27,6 +27,7 @@ import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; @@ -75,6 +76,7 @@ public ServerCall.Listener interceptCall( if (reqContext != null) { callIGCallbackClientAddress(cbp, reqContext, call); callIGCallbackHeaders(cbp, reqContext, headers); + callIGCallbackGrpcServerMethod(cbp, reqContext, call.getMethodDescriptor()); } DECORATE.afterStart(span); @@ -314,6 +316,16 @@ private static void callIGCallbackRequestEnded(@Nonnull final AgentSpan span) { } } + private static void callIGCallbackGrpcServerMethod( + CallbackProvider cbp, RequestContext ctx, MethodDescriptor methodDescriptor) { + String method = methodDescriptor.getFullMethodName(); + BiFunction> cb = cbp.getCallback(EVENTS.grpcServerMethod()); + if (method == null || cb == null) { + return; + } + cb.apply(ctx, method); + } + private static void callIGCallbackGrpcMessage(@Nonnull final AgentSpan span, Object obj) { if (obj == null) { return; diff --git a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy index c8ee881f940..b8695ac79c2 100644 --- a/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy +++ b/dd-java-agent/instrumentation/grpc-1.5/src/test/groovy/GrpcTest.groovy @@ -47,6 +47,7 @@ abstract class GrpcTest extends VersionedNamingTestBase { def collectedAppSecHeaders = [:] boolean appSecHeaderDone = false def collectedAppSecReqMsgs = [] + def collectedAppSecServerMethods = [] @Override final String service() { @@ -89,6 +90,10 @@ abstract class GrpcTest extends VersionedNamingTestBase { collectedAppSecReqMsgs << obj Flow.ResultFlow.empty() } as BiFunction>) + ig.registerCallback(EVENTS.grpcServerMethod(), { reqCtx, method -> + collectedAppSecServerMethods << method + Flow.ResultFlow.empty() + } as BiFunction>) } def cleanup() { @@ -217,6 +222,8 @@ abstract class GrpcTest extends VersionedNamingTestBase { traceId.toLong() as String == collectedAppSecHeaders['x-datadog-trace-id'] collectedAppSecReqMsgs.size() == 1 collectedAppSecReqMsgs.first().name == name + collectedAppSecServerMethods.size() == 1 + collectedAppSecServerMethods.first() == 'example.Greeter/SayHello' and: if (isDataStreamsEnabled()) { diff --git a/internal-api/src/main/java/datadog/trace/api/gateway/Events.java b/internal-api/src/main/java/datadog/trace/api/gateway/Events.java index e4472874b6d..e62be918135 100644 --- a/internal-api/src/main/java/datadog/trace/api/gateway/Events.java +++ b/internal-api/src/main/java/datadog/trace/api/gateway/Events.java @@ -202,6 +202,17 @@ public EventType>> grpcServerReque GRAPHQL_SERVER_REQUEST_MESSAGE; } + static final int GRPC_SERVER_METHOD_ID = 16; + + @SuppressWarnings("rawtypes") + private static final EventType GRPC_SERVER_METHOD = + new ET<>("grpc.server.method", GRPC_SERVER_METHOD_ID); + + @SuppressWarnings("unchecked") + public EventType>> grpcServerMethod() { + return (EventType>>) GRPC_SERVER_METHOD; + } + static final int MAX_EVENTS = nextId.get(); private static final class ET extends EventType {