From 33a9126747e1ce55c4b18730ceeff61af935e418 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 16 Sep 2025 16:09:52 +0200 Subject: [PATCH 1/4] Lazily obtain feature discovery when starting client metrics --- .../ConflatingMetricsAggregatorBenchmark.java | 30 +++++--- .../metrics/ConflatingMetricsAggregator.java | 75 +++++++++++-------- .../java/datadog/trace/core/CoreTracer.java | 8 +- .../ConflatingMetricAggregatorTest.groovy | 43 ++++++----- .../common/metrics/FootprintForkedTest.groovy | 5 +- 5 files changed, 91 insertions(+), 70 deletions(-) diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 3ae8a050f32..4f5911ebc69 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -4,6 +4,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.communication.monitor.Monitoring; import datadog.trace.api.WellKnownTags; import datadog.trace.core.CoreSpan; @@ -32,20 +33,25 @@ @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) public class ConflatingMetricsAggregatorBenchmark { - private final DDAgentFeaturesDiscovery featuresDiscovery = - new FixedAgentFeaturesDiscovery( - Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( - new WellKnownTags("", "", "", "", "", ""), - Collections.emptySet(), - featuresDiscovery, - HealthMetrics.NO_OP, - new NullSink(), - 2048, - 2048); + private final SharedCommunicationObjects sco = new SharedCommunicationObjects(); + private final ConflatingMetricsAggregator aggregator; private final List> spans = generateTrace(64); + public ConflatingMetricsAggregatorBenchmark() { + sco.setFeaturesDiscovery( + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet())); + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + sco, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048); + } + static List> generateTrace(int len) { final List> trace = new ArrayList<>(); for (int i = 0; i < len; i++) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index d2fd5e12a93..9eac67316a6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -98,7 +98,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final Aggregator aggregator; private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; - private final DDAgentFeaturesDiscovery features; + private final SharedCommunicationObjects sharedCommunicationObjects; + private DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; @@ -110,7 +111,7 @@ public ConflatingMetricsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), - sharedCommunicationObjects.featuresDiscovery(config), + sharedCommunicationObjects, healthMetrics, new OkHttpSink( sharedCommunicationObjects.okHttpClient, @@ -126,7 +127,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -134,7 +135,7 @@ public ConflatingMetricsAggregator( this( wellKnownTags, ignoredResources, - features, + sharedCommunicationObjects, healthMetric, sink, maxAggregates, @@ -146,7 +147,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -155,7 +156,7 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit) { this( ignoredResources, - features, + sharedCommunicationObjects, healthMetric, sink, new SerializingMetricWriter(wellKnownTags, sink), @@ -167,7 +168,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, MetricWriter metricWriter, @@ -180,7 +181,7 @@ public ConflatingMetricsAggregator( this.batchPool = new SpmcArrayQueue<>(maxAggregates); this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3); this.keys = new NonBlockingHashMap<>(); - this.features = features; + this.sharedCommunicationObjects = sharedCommunicationObjects; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = @@ -200,6 +201,7 @@ public ConflatingMetricsAggregator( @Override public void start() { + features = sharedCommunicationObjects.featuresDiscovery(Config.get()); sink.register(this); thread.start(); cancellation = @@ -214,10 +216,13 @@ public void start() { } private boolean isMetricsEnabled() { - if (features.getMetricsEndpoint() == null) { - features.discoverIfOutdated(); + if (features != null) { + if (features.getMetricsEndpoint() == null) { + features.discoverIfOutdated(); + } + return features.supportsMetrics(); } - return features.supportsMetrics(); + return false; } @Override @@ -237,7 +242,7 @@ public boolean report() { @Override public Future forceReport() { // Ensure the feature is enabled - if (!isMetricsEnabled()) { + if (features != null && !isMetricsEnabled()) { return CompletableFuture.completedFuture(false); } // Wait for the thread to start @@ -273,7 +278,7 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; - if (features.supportsMetrics()) { + if (features != null && features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span)) { @@ -354,31 +359,34 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { } private List getPeerTags(CoreSpan span, String spanKind) { - if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { - List peerTags = new ArrayList<>(); - for (String peerTag : features.peerTags()) { - Object value = span.getTag(peerTag); - if (value != null) { + if (features != null) { + if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + List peerTags = new ArrayList<>(); + for (String peerTag : features.peerTags()) { + Object value = span.getTag(peerTag); + if (value != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add( + cacheAndCreator + .getLeft() + .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + } + } + return peerTags; + } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { + // in this case only the base service should be aggregated if present + final Object baseService = span.getTag(BASE_SERVICE); + if (baseService != null) { final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add( + cacheAndCreator = + PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); + return Collections.singletonList( cacheAndCreator .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); } } - return peerTags; - } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.getTag(BASE_SERVICE); - if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); - } } return Collections.emptyList(); } @@ -434,6 +442,7 @@ public void onEvent(EventType eventType, String message) { } private void disable() { + // note: disable is called only if started so we're not nullchecking before accessing features features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index f1ca6fb3137..bf240410262 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -99,7 +99,6 @@ import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor; import datadog.trace.lambda.LambdaHandler; import datadog.trace.relocate.api.RatelimitedLogger; -import datadog.trace.util.AgentTaskScheduler; import java.io.IOException; import java.lang.ref.WeakReference; import java.math.BigInteger; @@ -784,11 +783,8 @@ private CoreTracer( metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics); - // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds - // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the - // same time from sending metrics in sync. - AgentTaskScheduler.get() - .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS); + // the jitter is brought implicitly by the callback that can vary + sharedCommunicationObjects.whenReady(metricsAggregator::start); if (dataStreamsMonitoring == null) { this.dataStreamsMonitoring = diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 52c1bb34de1..747cb33e7e3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -1,6 +1,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery +import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan @@ -38,7 +39,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, - features, + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, 10, @@ -68,7 +69,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), - features, + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, 10, @@ -104,7 +105,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -146,7 +147,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -197,7 +198,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -256,7 +257,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -304,7 +305,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -353,7 +354,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), @@ -418,7 +419,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -477,7 +478,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -567,7 +568,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -623,7 +624,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -670,7 +671,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -709,7 +710,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -740,7 +741,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) aggregator.start() when: @@ -762,7 +763,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] @@ -794,7 +795,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) when: def async = CompletableFuture.supplyAsync(new Supplier() { @@ -827,7 +828,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -875,4 +876,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Thread.sleep(10) } } + + def sharedCommunicationObjects(features) { + def ret = new SharedCommunicationObjects() + ret.setFeaturesDiscovery(features) + ret + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index 4a96460d604..da5fe6a26c4 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -1,6 +1,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery +import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification @@ -26,14 +27,16 @@ class FootprintForkedTest extends DDSpecification { setup: CountDownLatch latch = new CountDownLatch(1) ValidatingSink sink = new ValidatingSink(latch) + SharedCommunicationObjects sco = new SharedCommunicationObjects() DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true it.peerTags() >> [] } + sco.setFeaturesDiscovery(features) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, - features, + sco, HealthMetrics.NO_OP, sink, 1000, From 059088c998c38dd92c03f411bcf8fe9813dc5fde Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 16 Sep 2025 16:53:12 +0200 Subject: [PATCH 2/4] Add volatile to features --- .../trace/common/metrics/ConflatingMetricsAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 9eac67316a6..1383a7ea93f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -99,7 +99,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; private final SharedCommunicationObjects sharedCommunicationObjects; - private DDAgentFeaturesDiscovery features; + private volatile DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; From 2b1c37956339d45746396f62ee08ec48fd5acb95 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 16 Sep 2025 20:29:12 +0200 Subject: [PATCH 3/4] Defer discovery as much as possible --- .../metrics/ConflatingMetricsAggregator.java | 37 +++++++++++-------- .../metrics/MetricsReliabilityTest.groovy | 8 ++++ 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 1383a7ea93f..0c0c7c0d263 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -199,9 +199,20 @@ public ConflatingMetricsAggregator( this.reportingIntervalTimeUnit = timeUnit; } + private DDAgentFeaturesDiscovery getFeatures() { + DDAgentFeaturesDiscovery ret = features; + if (ret == null) { + return ret; + } + ret = sharedCommunicationObjects.featuresDiscovery(Config.get()); + features = ret; + return ret; + } + @Override public void start() { - features = sharedCommunicationObjects.featuresDiscovery(Config.get()); + AgentTaskScheduler.get() + .execute(() -> features = sharedCommunicationObjects.featuresDiscovery(Config.get())); sink.register(this); thread.start(); cancellation = @@ -216,13 +227,7 @@ public void start() { } private boolean isMetricsEnabled() { - if (features != null) { - if (features.getMetricsEndpoint() == null) { - features.discoverIfOutdated(); - } - return features.supportsMetrics(); - } - return false; + return getFeatures().supportsMetrics(); } @Override @@ -278,6 +283,7 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; + final DDAgentFeaturesDiscovery features = getFeatures(); if (features != null && features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); @@ -288,7 +294,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel); + forceKeep |= publish(span, isTopLevel, features); } } healthMetrics.onClientStatTraceComputed( @@ -310,7 +316,7 @@ private boolean spanKindEligible(CoreSpan span) { return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } - private boolean publish(CoreSpan span, boolean isTopLevel) { + private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDiscovery features) { final CharSequence spanKind = span.getTag(SPAN_KIND, ""); MetricKey newKey = new MetricKey( @@ -323,7 +329,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getParentId() == 0, SPAN_KINDS.computeIfAbsent( spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString())); + getPeerTags(span, spanKind.toString(), features)); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -358,7 +364,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private List getPeerTags(CoreSpan span, String spanKind) { + private List getPeerTags( + CoreSpan span, String spanKind, DDAgentFeaturesDiscovery features) { if (features != null) { if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { List peerTags = new ArrayList<>(); @@ -425,8 +432,7 @@ public void onEvent(EventType eventType, String message) { switch (eventType) { case DOWNGRADED: log.debug("Agent downgrade was detected"); - disable(); - healthMetrics.onClientStatDowngraded(); + AgentTaskScheduler.get().execute(this::disable); break; case BAD_PAYLOAD: log.debug("bad metrics payload sent to trace agent: {}", message); @@ -442,10 +448,11 @@ public void onEvent(EventType eventType, String message) { } private void disable() { - // note: disable is called only if started so we're not nullchecking before accessing features + final DDAgentFeaturesDiscovery features = getFeatures(); features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); + healthMetrics.onClientStatDowngraded(); this.pending.clear(); this.batchPool.clear(); this.inbox.clear(); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy index 8c7c64a275b..26103f11bed 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy @@ -12,6 +12,8 @@ import java.util.concurrent.CountDownLatch import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer +import java.util.concurrent.TimeUnit + class MetricsReliabilityTest extends DDCoreSpecification { static class State { @@ -87,6 +89,7 @@ class MetricsReliabilityTest extends DDCoreSpecification { when: "simulate an agent downgrade" + def discoveryState = featuresDiscovery.discoveryState state.reset(false, 404) tracer.startSpan("test", "test").finish() tracer.flush() @@ -94,6 +97,11 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "a discovery should have done - we do not support anymore stats calculation" state.latch.await() + // wait at least 5 seconds. the discovery is done asynchronously so we should wait for the internal state flip + def start = System.nanoTime() + while (discoveryState == featuresDiscovery.discoveryState && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { + Thread.yield() + } assert !featuresDiscovery.supportsMetrics() // 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors assertMetrics(healthMetrics, 2, 1, 2, 0, 1) From 36b62dd6a7a5341b993e03bc5c5734cf56d28cef Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 19 Sep 2025 09:28:35 +0200 Subject: [PATCH 4/4] review and simplification --- .../metrics/ConflatingMetricsAggregator.java | 64 ++++++++----------- .../java/datadog/trace/core/CoreTracer.java | 10 ++- 2 files changed, 36 insertions(+), 38 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 0c0c7c0d263..04f60a78fa1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -199,11 +199,13 @@ public ConflatingMetricsAggregator( this.reportingIntervalTimeUnit = timeUnit; } - private DDAgentFeaturesDiscovery getFeatures() { + private DDAgentFeaturesDiscovery featuresDiscovery() { DDAgentFeaturesDiscovery ret = features; - if (ret == null) { + if (ret != null) { return ret; } + // no need to synchronise here since it's already done in sharedCommunicationObject. + // At worst, we'll assign multiple time the variable but it will be the same object ret = sharedCommunicationObjects.featuresDiscovery(Config.get()); features = ret; return ret; @@ -211,8 +213,6 @@ private DDAgentFeaturesDiscovery getFeatures() { @Override public void start() { - AgentTaskScheduler.get() - .execute(() -> features = sharedCommunicationObjects.featuresDiscovery(Config.get())); sink.register(this); thread.start(); cancellation = @@ -226,10 +226,6 @@ public void start() { log.debug("started metrics aggregator"); } - private boolean isMetricsEnabled() { - return getFeatures().supportsMetrics(); - } - @Override public boolean report() { boolean published; @@ -246,8 +242,7 @@ public boolean report() { @Override public Future forceReport() { - // Ensure the feature is enabled - if (features != null && !isMetricsEnabled()) { + if (!featuresDiscovery().supportsMetrics()) { return CompletableFuture.completedFuture(false); } // Wait for the thread to start @@ -283,8 +278,8 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; - final DDAgentFeaturesDiscovery features = getFeatures(); - if (features != null && features.supportsMetrics()) { + final DDAgentFeaturesDiscovery features = featuresDiscovery(); + if (features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span)) { @@ -366,34 +361,31 @@ private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDis private List getPeerTags( CoreSpan span, String spanKind, DDAgentFeaturesDiscovery features) { - if (features != null) { - if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { - List peerTags = new ArrayList<>(); - for (String peerTag : features.peerTags()) { - Object value = span.getTag(peerTag); - if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); - } - } - return peerTags; - } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.getTag(BASE_SERVICE); - if (baseService != null) { + if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + List peerTags = new ArrayList<>(); + for (String peerTag : features.peerTags()) { + Object value = span.getTag(peerTag); + if (value != null) { final Pair, Function> - cacheAndCreator = - PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); + peerTags.add( cacheAndCreator .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); } } + return peerTags; + } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { + // in this case only the base service should be aggregated if present + final Object baseService = span.getTag(BASE_SERVICE); + if (baseService != null) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); + return Collections.singletonList( + cacheAndCreator + .getLeft() + .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + } } return Collections.emptyList(); } @@ -448,7 +440,7 @@ public void onEvent(EventType eventType, String message) { } private void disable() { - final DDAgentFeaturesDiscovery features = getFeatures(); + final DDAgentFeaturesDiscovery features = featuresDiscovery(); features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index bf240410262..7be97baf194 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -99,6 +99,7 @@ import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor; import datadog.trace.lambda.LambdaHandler; import datadog.trace.relocate.api.RatelimitedLogger; +import datadog.trace.util.AgentTaskScheduler; import java.io.IOException; import java.lang.ref.WeakReference; import java.math.BigInteger; @@ -783,8 +784,13 @@ private CoreTracer( metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics); - // the jitter is brought implicitly by the callback that can vary - sharedCommunicationObjects.whenReady(metricsAggregator::start); + // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds + // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the + // same time from sending metrics in sync. + sharedCommunicationObjects.whenReady( + () -> + AgentTaskScheduler.get() + .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS)); if (dataStreamsMonitoring == null) { this.dataStreamsMonitoring =