diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 3ae8a050f32..4f5911ebc69 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -4,6 +4,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.communication.monitor.Monitoring; import datadog.trace.api.WellKnownTags; import datadog.trace.core.CoreSpan; @@ -32,20 +33,25 @@ @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) public class ConflatingMetricsAggregatorBenchmark { - private final DDAgentFeaturesDiscovery featuresDiscovery = - new FixedAgentFeaturesDiscovery( - Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( - new WellKnownTags("", "", "", "", "", ""), - Collections.emptySet(), - featuresDiscovery, - HealthMetrics.NO_OP, - new NullSink(), - 2048, - 2048); + private final SharedCommunicationObjects sco = new SharedCommunicationObjects(); + private final ConflatingMetricsAggregator aggregator; private final List> spans = generateTrace(64); + public ConflatingMetricsAggregatorBenchmark() { + sco.setFeaturesDiscovery( + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet())); + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + sco, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048); + } + static List> generateTrace(int len) { final List> trace = new ArrayList<>(); for (int i = 0; i < len; i++) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index d2fd5e12a93..04f60a78fa1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -98,7 +98,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final Aggregator aggregator; private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; - private final DDAgentFeaturesDiscovery features; + private final SharedCommunicationObjects sharedCommunicationObjects; + private volatile DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; @@ -110,7 +111,7 @@ public ConflatingMetricsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), - sharedCommunicationObjects.featuresDiscovery(config), + sharedCommunicationObjects, healthMetrics, new OkHttpSink( sharedCommunicationObjects.okHttpClient, @@ -126,7 +127,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -134,7 +135,7 @@ public ConflatingMetricsAggregator( this( wellKnownTags, ignoredResources, - features, + sharedCommunicationObjects, healthMetric, sink, maxAggregates, @@ -146,7 +147,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -155,7 +156,7 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit) { this( ignoredResources, - features, + sharedCommunicationObjects, healthMetric, sink, new SerializingMetricWriter(wellKnownTags, sink), @@ -167,7 +168,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, - DDAgentFeaturesDiscovery features, + SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetric, Sink sink, MetricWriter metricWriter, @@ -180,7 +181,7 @@ public ConflatingMetricsAggregator( this.batchPool = new SpmcArrayQueue<>(maxAggregates); this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3); this.keys = new NonBlockingHashMap<>(); - this.features = features; + this.sharedCommunicationObjects = sharedCommunicationObjects; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = @@ -198,6 +199,18 @@ public ConflatingMetricsAggregator( this.reportingIntervalTimeUnit = timeUnit; } + private DDAgentFeaturesDiscovery featuresDiscovery() { + DDAgentFeaturesDiscovery ret = features; + if (ret != null) { + return ret; + } + // no need to synchronise here since it's already done in sharedCommunicationObject. + // At worst, we'll assign multiple time the variable but it will be the same object + ret = sharedCommunicationObjects.featuresDiscovery(Config.get()); + features = ret; + return ret; + } + @Override public void start() { sink.register(this); @@ -213,13 +226,6 @@ public void start() { log.debug("started metrics aggregator"); } - private boolean isMetricsEnabled() { - if (features.getMetricsEndpoint() == null) { - features.discoverIfOutdated(); - } - return features.supportsMetrics(); - } - @Override public boolean report() { boolean published; @@ -236,8 +242,7 @@ public boolean report() { @Override public Future forceReport() { - // Ensure the feature is enabled - if (!isMetricsEnabled()) { + if (!featuresDiscovery().supportsMetrics()) { return CompletableFuture.completedFuture(false); } // Wait for the thread to start @@ -273,6 +278,7 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; + final DDAgentFeaturesDiscovery features = featuresDiscovery(); if (features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); @@ -283,7 +289,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel); + forceKeep |= publish(span, isTopLevel, features); } } healthMetrics.onClientStatTraceComputed( @@ -305,7 +311,7 @@ private boolean spanKindEligible(CoreSpan span) { return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } - private boolean publish(CoreSpan span, boolean isTopLevel) { + private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDiscovery features) { final CharSequence spanKind = span.getTag(SPAN_KIND, ""); MetricKey newKey = new MetricKey( @@ -318,7 +324,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { span.getParentId() == 0, SPAN_KINDS.computeIfAbsent( spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString())); + getPeerTags(span, spanKind.toString(), features)); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -353,7 +359,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return isNewKey || span.getError() > 0; } - private List getPeerTags(CoreSpan span, String spanKind) { + private List getPeerTags( + CoreSpan span, String spanKind, DDAgentFeaturesDiscovery features) { if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { List peerTags = new ArrayList<>(); for (String peerTag : features.peerTags()) { @@ -417,8 +424,7 @@ public void onEvent(EventType eventType, String message) { switch (eventType) { case DOWNGRADED: log.debug("Agent downgrade was detected"); - disable(); - healthMetrics.onClientStatDowngraded(); + AgentTaskScheduler.get().execute(this::disable); break; case BAD_PAYLOAD: log.debug("bad metrics payload sent to trace agent: {}", message); @@ -434,9 +440,11 @@ public void onEvent(EventType eventType, String message) { } private void disable() { + final DDAgentFeaturesDiscovery features = featuresDiscovery(); features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); + healthMetrics.onClientStatDowngraded(); this.pending.clear(); this.batchPool.clear(); this.inbox.clear(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index f1ca6fb3137..7be97baf194 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -787,8 +787,10 @@ private CoreTracer( // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the // same time from sending metrics in sync. - AgentTaskScheduler.get() - .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS); + sharedCommunicationObjects.whenReady( + () -> + AgentTaskScheduler.get() + .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS)); if (dataStreamsMonitoring == null) { this.dataStreamsMonitoring = diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 52c1bb34de1..747cb33e7e3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -1,6 +1,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery +import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan @@ -38,7 +39,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, - features, + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, 10, @@ -68,7 +69,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), - features, + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, 10, @@ -104,7 +105,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -146,7 +147,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -197,7 +198,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -256,7 +257,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -304,7 +305,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -353,7 +354,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), @@ -418,7 +419,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -477,7 +478,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -567,7 +568,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -623,7 +624,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -670,7 +671,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -709,7 +710,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -740,7 +741,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) aggregator.start() when: @@ -762,7 +763,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] @@ -794,7 +795,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) when: def async = CompletableFuture.supplyAsync(new Supplier() { @@ -827,7 +828,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -875,4 +876,10 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Thread.sleep(10) } } + + def sharedCommunicationObjects(features) { + def ret = new SharedCommunicationObjects() + ret.setFeaturesDiscovery(features) + ret + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index 4a96460d604..da5fe6a26c4 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -1,6 +1,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery +import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification @@ -26,14 +27,16 @@ class FootprintForkedTest extends DDSpecification { setup: CountDownLatch latch = new CountDownLatch(1) ValidatingSink sink = new ValidatingSink(latch) + SharedCommunicationObjects sco = new SharedCommunicationObjects() DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true it.peerTags() >> [] } + sco.setFeaturesDiscovery(features) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, - features, + sco, HealthMetrics.NO_OP, sink, 1000, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy index 8c7c64a275b..26103f11bed 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy @@ -12,6 +12,8 @@ import java.util.concurrent.CountDownLatch import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer +import java.util.concurrent.TimeUnit + class MetricsReliabilityTest extends DDCoreSpecification { static class State { @@ -87,6 +89,7 @@ class MetricsReliabilityTest extends DDCoreSpecification { when: "simulate an agent downgrade" + def discoveryState = featuresDiscovery.discoveryState state.reset(false, 404) tracer.startSpan("test", "test").finish() tracer.flush() @@ -94,6 +97,11 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "a discovery should have done - we do not support anymore stats calculation" state.latch.await() + // wait at least 5 seconds. the discovery is done asynchronously so we should wait for the internal state flip + def start = System.nanoTime() + while (discoveryState == featuresDiscovery.discoveryState && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { + Thread.yield() + } assert !featuresDiscovery.supportsMetrics() // 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors assertMetrics(healthMetrics, 2, 1, 2, 0, 1)