Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public AsyncPropagatingDisableInstrumentation() {
namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask");
private static final ElementMatcher<TypeDescription> RXJAVA2_DISABLED_TYPE_INITIALIZERS =
named("io.reactivex.internal.schedulers.AbstractDirectTask");
private static final ElementMatcher<TypeDescription> JAVA_HTTP_CLIENT =
extendsClass(named("java.net.http.HttpClient"));

@Override
public boolean onlyMatchKnownTypes() {
Expand Down Expand Up @@ -80,7 +82,8 @@ public String[] knownMatchingTypes() {
"org.springframework.jms.listener.DefaultMessageListenerContainer",
"org.apache.activemq.broker.TransactionBroker",
"com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager",
"io.reactivex.internal.schedulers.AbstractDirectTask"
"io.reactivex.internal.schedulers.AbstractDirectTask",
"jdk.internal.net.http.HttpClientImpl"
};
}

Expand All @@ -94,7 +97,8 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
return RX_WORKERS
.or(GRPC_MANAGED_CHANNEL)
.or(REACTOR_DISABLED_TYPE_INITIALIZERS)
.or(RXJAVA2_DISABLED_TYPE_INITIALIZERS);
.or(RXJAVA2_DISABLED_TYPE_INITIALIZERS)
.or(JAVA_HTTP_CLIENT);
}

@Override
Expand Down Expand Up @@ -180,6 +184,7 @@ public void methodAdvice(MethodTransformer transformer) {
isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice);
transformer.applyAdvice(
isTypeInitializer().and(isDeclaredBy(RXJAVA2_DISABLED_TYPE_INITIALIZERS)), advice);
transformer.applyAdvice(namedOneOf("sendAsync").and(isDeclaredBy(JAVA_HTTP_CLIENT)), advice);
}

public static class DisableAsyncAdvice {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package datadog.trace.instrumentation.httpclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
Expand All @@ -11,20 +14,21 @@

public class BodyHandlerWrapper<T> implements BodyHandler<T> {
private final BodyHandler<T> delegate;
private final AgentScope.Continuation continuation;
private final AgentSpan span;

public BodyHandlerWrapper(BodyHandler<T> delegate, AgentScope.Continuation context) {
public BodyHandlerWrapper(BodyHandler<T> delegate, AgentSpan span) {
this.delegate = delegate;
this.continuation = context;
this.span = span;
}

@Override
public BodySubscriber<T> apply(ResponseInfo responseInfo) {
// Capture the continuation lazily here rather than at sendAsync() call time.
BodySubscriber<T> subscriber = delegate.apply(responseInfo);
if (subscriber instanceof BodySubscriberWrapper) {
return subscriber;
}
return new BodySubscriberWrapper<>(subscriber, continuation);
return new BodySubscriberWrapper<>(subscriber, captureSpan(span));
}

static class BodySubscriberWrapper<T> implements BodySubscriber<T> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datadog.trace.instrumentation.httpclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.captureSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.httpclient.JavaNetClientDecorator.DECORATE;
import static datadog.trace.instrumentation.httpclient.JavaNetClientDecorator.INSTRUMENTATION_NAME;
Expand Down Expand Up @@ -38,7 +37,10 @@ public static AgentScope methodEnter(
final AgentSpan span = startSpan(INSTRUMENTATION_NAME, OPERATION_NAME);
final AgentScope scope = activateSpan(span);
if (bodyHandler != null) {
bodyHandler = new BodyHandlerWrapper<>(bodyHandler, captureSpan(span));
// Pass span directly — BodyHandlerWrapper captures the continuation lazily in apply(),
// only once response headers arrive. This avoids leaking a continuation when the
// connection fails before headers are received.
bodyHandler = new BodyHandlerWrapper<>(bodyHandler, span);
}

DECORATE.afterStart(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import java.net.http.HttpResponse
import java.time.Duration

abstract class JavaHttpClientTest extends HttpClientTest {
@Override
boolean useStrictTraceWrites() {
// TODO fix this by making sure that spans get closed properly
return false
}
Comment thread
amarziali marked this conversation as resolved.

def client = HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(CONNECT_TIMEOUT_MS))
Expand Down