Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.monitor.Monitoring;
import datadog.trace.api.WellKnownTags;
import datadog.trace.core.CoreSpan;
Expand Down Expand Up @@ -32,20 +33,25 @@
@OutputTimeUnit(MICROSECONDS)
@Fork(value = 1)
public class ConflatingMetricsAggregatorBenchmark {
private final DDAgentFeaturesDiscovery featuresDiscovery =
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet());
private final ConflatingMetricsAggregator aggregator =
new ConflatingMetricsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
featuresDiscovery,
HealthMetrics.NO_OP,
new NullSink(),
2048,
2048);
private final SharedCommunicationObjects sco = new SharedCommunicationObjects();
private final ConflatingMetricsAggregator aggregator;
private final List<CoreSpan<?>> spans = generateTrace(64);

public ConflatingMetricsAggregatorBenchmark() {
sco.setFeaturesDiscovery(
new FixedAgentFeaturesDiscovery(
Collections.singleton("peer.hostname"), Collections.emptySet()));
aggregator =
new ConflatingMetricsAggregator(
new WellKnownTags("", "", "", "", "", ""),
Collections.emptySet(),
sco,
HealthMetrics.NO_OP,
new NullSink(),
2048,
2048);
}

static List<CoreSpan<?>> generateTrace(int len) {
final List<CoreSpan<?>> trace = new ArrayList<>();
for (int i = 0; i < len; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final Aggregator aggregator;
private final long reportingInterval;
private final TimeUnit reportingIntervalTimeUnit;
private final DDAgentFeaturesDiscovery features;
private final SharedCommunicationObjects sharedCommunicationObjects;
private volatile DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;

private volatile AgentTaskScheduler.Scheduled<?> cancellation;
Expand All @@ -110,7 +111,7 @@ public ConflatingMetricsAggregator(
this(
config.getWellKnownTags(),
config.getMetricsIgnoredResources(),
sharedCommunicationObjects.featuresDiscovery(config),
sharedCommunicationObjects,
healthMetrics,
new OkHttpSink(
sharedCommunicationObjects.okHttpClient,
Expand All @@ -126,15 +127,15 @@ public ConflatingMetricsAggregator(
ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize) {
this(
wellKnownTags,
ignoredResources,
features,
sharedCommunicationObjects,
healthMetric,
sink,
maxAggregates,
Expand All @@ -146,7 +147,7 @@ public ConflatingMetricsAggregator(
ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
Expand All @@ -155,7 +156,7 @@ public ConflatingMetricsAggregator(
TimeUnit timeUnit) {
this(
ignoredResources,
features,
sharedCommunicationObjects,
healthMetric,
sink,
new SerializingMetricWriter(wellKnownTags, sink),
Expand All @@ -167,7 +168,7 @@ public ConflatingMetricsAggregator(

ConflatingMetricsAggregator(
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetric,
Sink sink,
MetricWriter metricWriter,
Expand All @@ -180,7 +181,7 @@ public ConflatingMetricsAggregator(
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3);
this.keys = new NonBlockingHashMap<>();
this.features = features;
this.sharedCommunicationObjects = sharedCommunicationObjects;
this.healthMetrics = healthMetric;
this.sink = sink;
this.aggregator =
Expand All @@ -198,6 +199,18 @@ public ConflatingMetricsAggregator(
this.reportingIntervalTimeUnit = timeUnit;
}

private DDAgentFeaturesDiscovery featuresDiscovery() {
DDAgentFeaturesDiscovery ret = features;
if (ret != null) {
return ret;
}
// no need to synchronise here since it's already done in sharedCommunicationObject.
// At worst, we'll assign multiple time the variable but it will be the same object
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
features = ret;
return ret;
}

@Override
public void start() {
sink.register(this);
Expand All @@ -213,13 +226,6 @@ public void start() {
log.debug("started metrics aggregator");
}

private boolean isMetricsEnabled() {
if (features.getMetricsEndpoint() == null) {
features.discoverIfOutdated();
}
return features.supportsMetrics();
}

@Override
public boolean report() {
boolean published;
Expand All @@ -236,8 +242,7 @@ public boolean report() {

@Override
public Future<Boolean> forceReport() {
// Ensure the feature is enabled
if (!isMetricsEnabled()) {
if (!featuresDiscovery().supportsMetrics()) {
return CompletableFuture.completedFuture(false);
}
// Wait for the thread to start
Expand Down Expand Up @@ -273,6 +278,7 @@ public Future<Boolean> forceReport() {
public boolean publish(List<? extends CoreSpan<?>> trace) {
boolean forceKeep = false;
int counted = 0;
final DDAgentFeaturesDiscovery features = featuresDiscovery();
if (features.supportsMetrics()) {
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
Expand All @@ -283,7 +289,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
break;
}
counted++;
forceKeep |= publish(span, isTopLevel);
forceKeep |= publish(span, isTopLevel, features);
}
}
healthMetrics.onClientStatTraceComputed(
Expand All @@ -305,7 +311,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
}

private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDiscovery features) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed signatures to avoid accessing that volatile things multiple time when a single trace is published

final CharSequence spanKind = span.getTag(SPAN_KIND, "");
MetricKey newKey =
new MetricKey(
Expand All @@ -318,7 +324,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
span.getParentId() == 0,
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()));
getPeerTags(span, spanKind.toString(), features));
boolean isNewKey = false;
MetricKey key = keys.putIfAbsent(newKey, newKey);
if (null == key) {
Expand Down Expand Up @@ -353,7 +359,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
return isNewKey || span.getError() > 0;
}

private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
private List<UTF8BytesString> getPeerTags(
CoreSpan<?> span, String spanKind, DDAgentFeaturesDiscovery features) {
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
List<UTF8BytesString> peerTags = new ArrayList<>();
for (String peerTag : features.peerTags()) {
Expand Down Expand Up @@ -417,8 +424,7 @@ public void onEvent(EventType eventType, String message) {
switch (eventType) {
case DOWNGRADED:
log.debug("Agent downgrade was detected");
disable();
healthMetrics.onClientStatDowngraded();
AgentTaskScheduler.get().execute(this::disable);
break;
case BAD_PAYLOAD:
log.debug("bad metrics payload sent to trace agent: {}", message);
Expand All @@ -434,9 +440,11 @@ public void onEvent(EventType eventType, String message) {
}

private void disable() {
final DDAgentFeaturesDiscovery features = featuresDiscovery();
features.discover();
if (!features.supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
healthMetrics.onClientStatDowngraded();
this.pending.clear();
this.batchPool.clear();
this.inbox.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,10 @@ private CoreTracer(
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
// (using milliseconds granularity.) This avoids a fleet of traced applications starting at the
// same time from sending metrics in sync.
AgentTaskScheduler.get()
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS);
sharedCommunicationObjects.whenReady(
() ->
AgentTaskScheduler.get()
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS));

if (dataStreamsMonitoring == null) {
this.dataStreamsMonitoring =
Expand Down
Loading