From d16e45bac8d5efd5691e88e3fd39e628ddbff8ef Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 15:58:33 -0400 Subject: [PATCH 1/8] Cap per-field metric tag cardinality via Property/TagCardinalityHandler Replaces the per-field DDCache layer inside AggregateEntry with the two new cardinality handlers. Each per-field handler holds a small HashMap working set; when its budget is exhausted, subsequent values collapse to a stable "blocked_by_tracer" sentinel UTF8BytesString rather than growing without bound. The handlers are reset on the aggregator thread at the end of each report() cycle (10s default), so the cardinality budget refreshes per reporting interval. Caches replaced (limits preserved from the prior DDCache sizes): RESOURCE_HANDLER 32 SERVICE_HANDLER 32 OPERATION_HANDLER 64 SERVICE_SOURCE_HANDLER 16 TYPE_HANDLER 8 SPAN_KIND_HANDLER 16 HTTP_METHOD_HANDLER 8 HTTP_ENDPOINT_HANDLER 32 GRPC_STATUS_CODE_HANDLER 32 PEER_TAG_HANDLERS per-tag-name TagCardinalityHandler, each 512 Two production-only changes to the handlers as the user wrote them: - Fixed import: datadog.collections.tagmap6lazy.TagMap doesn't exist; TagCardinalityHandler now imports datadog.trace.api.TagMap which has the Entry API the handler uses. - Added TagCardinalityHandler.register(String) overload so AggregateEntry's peer-tag canonicalization doesn't have to allocate a TagMap.Entry per call -- the snapshot already carries peer-tag values as a flattened String[] {name, value, ...}. AggregateEntry split into two construction paths: - forSnapshot(snapshot, agg): the hot path; runs each field through the appropriate handler. - of(...): test-only factory; bypasses the handlers and creates UTF8 instances directly, so tests don't pollute static handler state. Content- equality on the resulting entry still matches the production-built one. Thread-safety: handlers are HashMap-backed and not safe for concurrent access. Both forSnapshot and resetCardinalityHandlers must be called from the aggregator thread. After the prior commits that moved MetricKey construction to the aggregator thread, this is the only thread that canonicalizes; the test factory path runs on test threads but doesn't touch the handlers. Reset semantics: clearing the handler's working set drops the {value -> UTF8BytesString} mapping but doesn't invalidate existing AggregateEntry fields -- those keep their UTF8BytesString references alive on their own. Subsequent snapshots with the same content still resolve to the existing entries via content-equality matches(). New values after reset get freshly allocated UTF8BytesStrings via the handler. Known limitation (not fixed here): hashOf(SpanSnapshot) hashes from the raw snapshot fields, not from the post-handler canonical form. So when cardinality is exceeded, multiple distinct raw values that collapse to the "blocked_by_tracer" sentinel still produce distinct hashes and land in different AggregateEntry buckets -- the wire payload will carry multiple rows that all label as blocked. This is the same behavior the prior DDCache-based design would have had at capacity. Collapsing those into a single sentinel entry would require canonicalizing before hashing and is a follow-up. Tests: new CardinalityHandlerTest covers PropertyCardinalityHandler and TagCardinalityHandler in isolation (hit/miss, over-limit blocking, reset behavior, sentinel stability). Existing ConflatingMetricAggregatorTest / SerializingMetricWriterTest / AggregateTableTest all pass unchanged because the test factory bypasses handlers. Benchmarks (2 forks x 5 iter x 15s) -- producer side unchanged because the handlers live on the consumer thread: SimpleSpan bench: 3.114 +- 0.045 us/op (prior: 3.123 +- 0.018) DDSpan bench: 2.364 +- 0.113 us/op (prior: 2.412 +- 0.022) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 279 +++++++++++------- .../trace/common/metrics/Aggregator.java | 3 + .../metrics/PropertyCardinalityHandler.java | 45 +++ .../common/metrics/TagCardinalityHandler.java | 76 +++++ .../metrics/CardinalityHandlerTest.java | 88 ++++++ 5 files changed, 384 insertions(+), 107 deletions(-) create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index e2fda9fde47..55536b7a8f3 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -1,19 +1,15 @@ package datadog.trace.common.metrics; -import static datadog.trace.api.Functions.UTF8_ENCODE; -import static datadog.trace.bootstrap.instrumentation.api.UTF8BytesString.EMPTY; - -import datadog.trace.api.Pair; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.function.Function; +import java.util.Map; +import java.util.Objects; /** * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data @@ -24,45 +20,41 @@ * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry * instead of splitting. * - *

The static UTF8 caches that used to live on {@code MetricKey} and {@code - * ConflatingMetricsAggregator} are consolidated here. + *

UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link + * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval and + * overflow values are bucketed into a {@code blocked_by_tracer} sentinel rather than allowed to + * grow without bound. The handlers are reset on the aggregator thread every reporting cycle via + * {@link #resetCardinalityHandlers()}. + * + *

Thread-safety: the cardinality handlers are not thread-safe. Only the aggregator thread + * may call {@link #forSnapshot} or {@link #resetCardinalityHandlers}. Test code uses {@link #of} + * which constructs entries without touching the handlers. */ final class AggregateEntry extends Hashtable.Entry { - // UTF8 caches consolidated from the previous MetricKey + ConflatingMetricsAggregator split. - private static final DDCache RESOURCE_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache SERVICE_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache OPERATION_CACHE = - DDCaches.newFixedSizeCache(64); - private static final DDCache SERVICE_SOURCE_CACHE = - DDCaches.newFixedSizeCache(16); - private static final DDCache TYPE_CACHE = DDCaches.newFixedSizeCache(8); - private static final DDCache SPAN_KIND_CACHE = - DDCaches.newFixedSizeCache(16); - private static final DDCache HTTP_METHOD_CACHE = - DDCaches.newFixedSizeCache(8); - private static final DDCache HTTP_ENDPOINT_CACHE = - DDCaches.newFixedSizeCache(32); - private static final DDCache GRPC_STATUS_CODE_CACHE = - DDCaches.newFixedSizeCache(32); - - /** - * Outer cache keyed by peer-tag name, with an inner per-name cache keyed by value. The inner - * cache produces the "name:value" encoded form the serializer writes. - */ - private static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); - - private static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); + // Per-field cardinality limits. Identical to the prior DDCache sizes. + private static final PropertyCardinalityHandler RESOURCE_HANDLER = + new PropertyCardinalityHandler(32); + private static final PropertyCardinalityHandler SERVICE_HANDLER = + new PropertyCardinalityHandler(32); + private static final PropertyCardinalityHandler OPERATION_HANDLER = + new PropertyCardinalityHandler(64); + private static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER = + new PropertyCardinalityHandler(16); + private static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8); + private static final PropertyCardinalityHandler SPAN_KIND_HANDLER = + new PropertyCardinalityHandler(16); + private static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = + new PropertyCardinalityHandler(8); + private static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER = + new PropertyCardinalityHandler(32); + private static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = + new PropertyCardinalityHandler(32); + + /** Per-peer-tag-name {@link TagCardinalityHandler}, each sized to 512 distinct values. */ + private static final Map PEER_TAG_HANDLERS = new HashMap<>(); + + private static final int PEER_TAG_VALUE_LIMIT = 512; private final UTF8BytesString resource; private final UTF8BytesString service; @@ -84,39 +76,79 @@ final class AggregateEntry extends Hashtable.Entry { final AggregateMetric aggregate; - /** Hot-path constructor for the producer/consumer flow. Builds UTF8 fields via the caches. */ - private AggregateEntry(SpanSnapshot s, long keyHash, AggregateMetric aggregate) { + /** Field-bearing constructor used by both the hot path and the test factory. */ + private AggregateEntry( + long keyHash, + UTF8BytesString resource, + UTF8BytesString service, + UTF8BytesString operationName, + UTF8BytesString serviceSource, + UTF8BytesString type, + UTF8BytesString spanKind, + UTF8BytesString httpMethod, + UTF8BytesString httpEndpoint, + UTF8BytesString grpcStatusCode, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + String[] peerTagPairsRaw, + List peerTags, + AggregateMetric aggregate) { super(keyHash); - this.resource = canonicalize(RESOURCE_CACHE, s.resourceName); - this.service = SERVICE_CACHE.computeIfAbsent(s.serviceName, UTF8_ENCODE); - this.operationName = canonicalize(OPERATION_CACHE, s.operationName); - this.serviceSource = - s.serviceNameSource == null - ? null - : canonicalize(SERVICE_SOURCE_CACHE, s.serviceNameSource); - this.type = canonicalize(TYPE_CACHE, s.spanType); - this.spanKind = SPAN_KIND_CACHE.computeIfAbsent(s.spanKind, UTF8BytesString::create); - this.httpMethod = - s.httpMethod == null - ? null - : HTTP_METHOD_CACHE.computeIfAbsent(s.httpMethod, UTF8BytesString::create); - this.httpEndpoint = - s.httpEndpoint == null - ? null - : HTTP_ENDPOINT_CACHE.computeIfAbsent(s.httpEndpoint, UTF8BytesString::create); - this.grpcStatusCode = - s.grpcStatusCode == null - ? null - : GRPC_STATUS_CODE_CACHE.computeIfAbsent(s.grpcStatusCode, UTF8BytesString::create); - this.httpStatusCode = s.httpStatusCode; - this.synthetic = s.synthetic; - this.traceRoot = s.traceRoot; - this.peerTagPairsRaw = s.peerTagPairs; - this.peerTags = materializePeerTags(s.peerTagPairs); + this.resource = resource; + this.service = service; + this.operationName = operationName; + this.serviceSource = serviceSource; + this.type = type; + this.spanKind = spanKind; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.peerTagPairsRaw = peerTagPairsRaw; + this.peerTags = peerTags; this.aggregate = aggregate; } - /** Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. */ + /** + * Production hot path: canonicalize each snapshot field via the cardinality handlers. Must be + * called on the aggregator thread. Null-valued fields short-circuit to {@link + * UTF8BytesString#EMPTY} (or {@code null} for optional ones) so they don't consume a cardinality + * slot. + */ + static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { + return new AggregateEntry( + hashOf(s), + registerOrEmpty(RESOURCE_HANDLER, s.resourceName), + registerOrEmpty(SERVICE_HANDLER, s.serviceName), + registerOrEmpty(OPERATION_HANDLER, s.operationName), + s.serviceNameSource == null ? null : SERVICE_SOURCE_HANDLER.register(s.serviceNameSource), + registerOrEmpty(TYPE_HANDLER, s.spanType), + registerOrEmpty(SPAN_KIND_HANDLER, s.spanKind), + s.httpMethod == null ? null : HTTP_METHOD_HANDLER.register(s.httpMethod), + s.httpEndpoint == null ? null : HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint), + s.grpcStatusCode == null ? null : GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode), + s.httpStatusCode, + s.synthetic, + s.traceRoot, + s.peerTagPairs, + canonicalizePeerTags(s.peerTagPairs), + aggregate); + } + + private static UTF8BytesString registerOrEmpty( + PropertyCardinalityHandler handler, CharSequence value) { + return value == null ? UTF8BytesString.EMPTY : handler.register(value); + } + + /** + * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Bypasses + * the cardinality handlers so tests don't pollute their state -- {@link UTF8BytesString}s are + * created directly. Content-equality on the resulting entry still matches an entry built via + * {@link #forSnapshot} from a snapshot of the same shape. + */ static AggregateEntry of( CharSequence resource, CharSequence service, @@ -132,7 +164,7 @@ static AggregateEntry of( CharSequence httpEndpoint, CharSequence grpcStatusCode) { String[] rawPairs = peerTagsToRawPairs(peerTags); - SpanSnapshot synthetic_snapshot = + SpanSnapshot syntheticSnapshot = new SpanSnapshot( resource, service == null ? null : service.toString(), @@ -149,12 +181,43 @@ static AggregateEntry of( grpcStatusCode == null ? null : grpcStatusCode.toString(), 0L); return new AggregateEntry( - synthetic_snapshot, hashOf(synthetic_snapshot), new AggregateMetric()); + hashOf(syntheticSnapshot), + createUtf8(resource), + createUtf8(service), + createUtf8(operationName), + serviceSource == null ? null : createUtf8(serviceSource), + createUtf8(type), + createUtf8(spanKind), + httpMethod == null ? null : createUtf8(httpMethod), + httpEndpoint == null ? null : createUtf8(httpEndpoint), + grpcStatusCode == null ? null : createUtf8(grpcStatusCode), + (short) httpStatusCode, + synthetic, + traceRoot, + rawPairs, + peerTags == null ? Collections.emptyList() : peerTags, + new AggregateMetric()); } - /** Construct from a snapshot at consumer-thread miss time. */ - static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { - return new AggregateEntry(s, hashOf(s), aggregate); + /** + * Resets every cardinality handler's working set. Must be called on the aggregator thread. + * Existing entries continue to hold their previously-issued {@link UTF8BytesString} references; + * matches() uses content-equality so snapshots delivered after a reset still resolve to the + * existing entries. + */ + static void resetCardinalityHandlers() { + RESOURCE_HANDLER.reset(); + SERVICE_HANDLER.reset(); + OPERATION_HANDLER.reset(); + SERVICE_SOURCE_HANDLER.reset(); + TYPE_HANDLER.reset(); + SPAN_KIND_HANDLER.reset(); + HTTP_METHOD_HANDLER.reset(); + HTTP_ENDPOINT_HANDLER.reset(); + GRPC_STATUS_CODE_HANDLER.reset(); + for (TagCardinalityHandler h : PEER_TAG_HANDLERS.values()) { + h.reset(); + } } boolean matches(SpanSnapshot s) { @@ -175,12 +238,9 @@ && stringContentEquals(httpEndpoint, s.httpEndpoint) /** * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no - * varargs / Object[] allocation, no autoboxing on primitive overloads. The constructor's - * super({@code hashOf(s)}) call uses the same function so an entry built from a snapshot hashes - * to the same bucket the snapshot itself looks up. - * - *

Hashes are content-stable across {@code String} / {@code UTF8BytesString}: {@link - * UTF8BytesString#hashCode()} returns the underlying {@code String}'s hash. + * varargs / Object[] allocation, no autoboxing on primitive overloads. Hashes are content-stable + * across {@code String} / {@code UTF8BytesString} because {@link UTF8BytesString#hashCode()} + * returns the underlying {@code String}'s hash. */ static long hashOf(SpanSnapshot s) { long h = 0; @@ -270,16 +330,16 @@ public boolean equals(Object o) { return httpStatusCode == that.httpStatusCode && synthetic == that.synthetic && traceRoot == that.traceRoot - && java.util.Objects.equals(resource, that.resource) - && java.util.Objects.equals(service, that.service) - && java.util.Objects.equals(operationName, that.operationName) - && java.util.Objects.equals(serviceSource, that.serviceSource) - && java.util.Objects.equals(type, that.type) - && java.util.Objects.equals(spanKind, that.spanKind) + && Objects.equals(resource, that.resource) + && Objects.equals(service, that.service) + && Objects.equals(operationName, that.operationName) + && Objects.equals(serviceSource, that.serviceSource) + && Objects.equals(type, that.type) + && Objects.equals(spanKind, that.spanKind) && peerTags.equals(that.peerTags) - && java.util.Objects.equals(httpMethod, that.httpMethod) - && java.util.Objects.equals(httpEndpoint, that.httpEndpoint) - && java.util.Objects.equals(grpcStatusCode, that.grpcStatusCode); + && Objects.equals(httpMethod, that.httpMethod) + && Objects.equals(httpEndpoint, that.httpEndpoint) + && Objects.equals(grpcStatusCode, that.grpcStatusCode); } @Override @@ -289,15 +349,15 @@ public int hashCode() { // ----- helpers ----- - private static UTF8BytesString canonicalize( - DDCache cache, CharSequence charSeq) { - if (charSeq == null) { - return EMPTY; + /** Direct {@link UTF8BytesString} creation that bypasses the cardinality handlers. */ + private static UTF8BytesString createUtf8(CharSequence cs) { + if (cs == null) { + return UTF8BytesString.EMPTY; } - if (charSeq instanceof UTF8BytesString) { - return (UTF8BytesString) charSeq; + if (cs instanceof UTF8BytesString) { + return (UTF8BytesString) cs; } - return cache.computeIfAbsent(charSeq.toString(), UTF8BytesString::create); + return UTF8BytesString.create(cs.toString()); } /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */ @@ -326,28 +386,33 @@ private static boolean stringContentEquals(UTF8BytesString a, String b) { return b != null && a.toString().equals(b); } - private static List materializePeerTags(String[] pairs) { + /** Production-path peer-tag canonicalization via per-name {@link TagCardinalityHandler}. */ + private static List canonicalizePeerTags(String[] pairs) { if (pairs == null || pairs.length == 0) { return Collections.emptyList(); } if (pairs.length == 2) { - return Collections.singletonList(encodePeerTag(pairs[0], pairs[1])); + return Collections.singletonList(handlerFor(pairs[0]).register(pairs[1])); } List tags = new ArrayList<>(pairs.length / 2); for (int i = 0; i < pairs.length; i += 2) { - tags.add(encodePeerTag(pairs[i], pairs[i + 1])); + tags.add(handlerFor(pairs[i]).register(pairs[i + 1])); } return tags; } - private static UTF8BytesString encodePeerTag(String name, String value) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); - return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + private static TagCardinalityHandler handlerFor(String peerTagName) { + TagCardinalityHandler h = PEER_TAG_HANDLERS.get(peerTagName); + if (h != null) { + return h; + } + h = new TagCardinalityHandler(peerTagName, PEER_TAG_VALUE_LIMIT); + PEER_TAG_HANDLERS.put(peerTagName, h); + return h; } /** - * Inverse of {@link #materializePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw + * Inverse of {@link #canonicalizePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by * the hot path. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index b4fc59d5a1d..9bcd41f37e4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -149,6 +149,9 @@ private void report(long when, SignalItem signal) { } dirty = false; } + // Reset cardinality handlers each report cycle so the per-field budgets refresh. + // Safe to call on this (aggregator) thread; handlers are HashMap-based and not thread-safe. + AggregateEntry.resetCardinalityHandlers(); signal.complete(); if (skipped) { log.debug("skipped metrics reporting because no points have changed"); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java new file mode 100644 index 00000000000..61560a32a71 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PropertyCardinalityHandler.java @@ -0,0 +1,45 @@ +package datadog.trace.common.metrics; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.HashMap; + +public final class PropertyCardinalityHandler { + private final int cardinalityLimit; + + private final HashMap curUtf8s; + + private UTF8BytesString cacheBlocked = null; + + public PropertyCardinalityHandler(int cardinalityLimit) { + this.cardinalityLimit = cardinalityLimit; + + // pre-sizing properly to avoid rehashing + this.curUtf8s = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + } + + public UTF8BytesString register(CharSequence value) { + if (this.curUtf8s.size() >= this.cardinalityLimit) { + return this.blockedByTracer(); + } + + UTF8BytesString existingUtf8 = this.curUtf8s.get(value); + if (existingUtf8 != null) return existingUtf8; + + // TODO: maybe use a fallback cache to reduce allocations across reset cycles + UTF8BytesString newUtf8 = UTF8BytesString.create(value); + this.curUtf8s.put(value, newUtf8); + return newUtf8; + } + + private UTF8BytesString blockedByTracer() { + UTF8BytesString cacheBlocked = this.cacheBlocked; + if (cacheBlocked != null) return cacheBlocked; + + this.cacheBlocked = cacheBlocked = UTF8BytesString.create("blocked_by_tracer"); + return cacheBlocked; + } + + public void reset() { + this.curUtf8s.clear(); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java new file mode 100644 index 00000000000..eeac6caf817 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java @@ -0,0 +1,76 @@ +package datadog.trace.common.metrics; + +import datadog.trace.api.TagMap; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.HashMap; + +public final class TagCardinalityHandler { + private final String tag; + private final int cardinalityLimit; + + private final HashMap curUtf8Pairs; + + private UTF8BytesString cacheBlocked = null; + + public TagCardinalityHandler(String tag, int cardinalityLimit) { + this.tag = tag; + this.cardinalityLimit = cardinalityLimit; + + // pre-sizing properly to avoid rehashing + this.curUtf8Pairs = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); + } + + public UTF8BytesString register(TagMap.Entry entry) { + if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { + return this.blockedByTracer(); + } + + if (!isValidType(entry)) { + return this.blockedByTracer(); + } + + // NOTE: This could lead to boxing -- not ideal + Object cacheKey = entry.objectValue(); + UTF8BytesString existing = this.curUtf8Pairs.get(cacheKey); + if (existing != null) return existing; + + // TODO: maybe use a fallback cache to reduce allocations across reset cycles + UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + entry.stringValue()); + this.curUtf8Pairs.put(cacheKey, newPair); + return newPair; + } + + /** + * String-keyed overload for callers that already hold a {@code (tag, value)} pair as Strings and + * would rather not allocate a {@link TagMap.Entry} per lookup -- e.g. the metrics aggregator's + * peer-tag flow, where peer-tag values are flattened into a {@code String[]} on the snapshot. + */ + public UTF8BytesString register(String value) { + if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { + return this.blockedByTracer(); + } + + UTF8BytesString existing = this.curUtf8Pairs.get(value); + if (existing != null) return existing; + + UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + value); + this.curUtf8Pairs.put(value, newPair); + return newPair; + } + + private static final boolean isValidType(TagMap.Entry entry) { + return entry.isNumericPrimitive() || entry.objectValue() instanceof CharSequence; + } + + private UTF8BytesString blockedByTracer() { + UTF8BytesString cacheBlocked = this.cacheBlocked; + if (cacheBlocked != null) return cacheBlocked; + + this.cacheBlocked = cacheBlocked = UTF8BytesString.create(this.tag + ":blocked_by_tracer"); + return cacheBlocked; + } + + public void reset() { + this.curUtf8Pairs.clear(); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java new file mode 100644 index 00000000000..bbdffb6061a --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/CardinalityHandlerTest.java @@ -0,0 +1,88 @@ +package datadog.trace.common.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import org.junit.jupiter.api.Test; + +class CardinalityHandlerTest { + + @Test + void propertyReturnsSameInstanceForRepeatedValueUntilLimit() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(3); + UTF8BytesString a1 = h.register("a"); + UTF8BytesString a2 = h.register("a"); + assertSame(a1, a2); + assertEquals("a", a1.toString()); + } + + @Test + void propertyOverLimitReturnsBlockedSentinel() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(2); + UTF8BytesString a = h.register("a"); + UTF8BytesString b = h.register("b"); + UTF8BytesString blocked1 = h.register("c"); + UTF8BytesString blocked2 = h.register("d"); + + assertEquals("blocked_by_tracer", blocked1.toString()); + assertSame(blocked1, blocked2); // same sentinel for all overflow values + assertNotSame(blocked1, a); + assertNotSame(blocked1, b); + } + + @Test + void propertyResetRefreshesBudget() { + PropertyCardinalityHandler h = new PropertyCardinalityHandler(2); + h.register("a"); + h.register("b"); + UTF8BytesString blocked = h.register("c"); + assertEquals("blocked_by_tracer", blocked.toString()); + + h.reset(); + + // After reset, three distinct values fit again, but the previous instances aren't reused. + UTF8BytesString afterReset = h.register("a"); + assertEquals("a", afterReset.toString()); + UTF8BytesString c = h.register("c"); + assertEquals("c", c.toString()); + UTF8BytesString blockedAgain = h.register("d"); + UTF8BytesString blockedYetAgain = h.register("e"); + assertEquals("blocked_by_tracer", blockedAgain.toString()); + assertSame(blockedAgain, blockedYetAgain); + } + + @Test + void tagPrefixesValuesAndReusesUnderLimit() { + TagCardinalityHandler h = new TagCardinalityHandler("peer.hostname", 4); + UTF8BytesString first = h.register("host-a"); + UTF8BytesString second = h.register("host-a"); + UTF8BytesString other = h.register("host-b"); + + assertSame(first, second); + assertNotSame(first, other); + assertEquals("peer.hostname:host-a", first.toString()); + assertEquals("peer.hostname:host-b", other.toString()); + } + + @Test + void tagOverLimitReturnsTaggedSentinel() { + TagCardinalityHandler h = new TagCardinalityHandler("peer.service", 1); + h.register("svc-1"); + UTF8BytesString blocked = h.register("svc-2"); + assertEquals("peer.service:blocked_by_tracer", blocked.toString()); + } + + @Test + void tagResetRefreshesBudgetAndSentinelStaysStable() { + TagCardinalityHandler h = new TagCardinalityHandler("x", 1); + h.register("v1"); + UTF8BytesString blockedBefore = h.register("v2"); + h.reset(); + h.register("v1"); + UTF8BytesString blockedAfter = h.register("v2"); + // Both are the same sentinel instance (cacheBlocked is not cleared on reset). + assertSame(blockedBefore, blockedAfter); + } +} From 70a8a70af432c221ecec5ca5f8f688eb04d6d08e Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 16:14:35 -0400 Subject: [PATCH 2/8] Canonicalize SpanSnapshot before hashing so blocked values collapse The prior commit ran every snapshot through the cardinality handlers but still hashed the raw snapshot fields. When a field exceeded its cardinality budget the handlers collapsed many distinct values to a single "blocked_by_tracer" sentinel, but the raw hashes were still all different -- so the blocked entries fragmented across the AggregateTable. This commit makes hash + match work off the canonical (post-handler) UTF8BytesString fields, so blocked values land in the same bucket and merge into one entry. How the lookup path changes --------------------------- A new package-private AggregateEntry.Canonical scratch buffer: - holds the 10 canonical UTF8BytesString refs, primitives, peerTags list, and the precomputed keyHash; - exposes populate(SpanSnapshot) which runs each field through the appropriate handler and computes the long hash from the canonical refs; - exposes matches(AggregateEntry) for content-equality lookup; - exposes toEntry(AggregateMetric) which copies its refs into a fresh AggregateEntry on miss. AggregateTable holds one Canonical instance and reuses it per findOrInsert. On a hit nothing is allocated -- the buffer's refs feed the bucket walk and matches() directly. On a miss the refs are copied into the new entry and the buffer is overwritten on the next call. Hash function ------------- hashOf now takes UTF8BytesString fields (plus primitives + peerTags list) instead of raw CharSequence/String from the snapshot. UTF8BytesString.hashCode returns the underlying String's hash, so: - content-equal entries built via AggregateEntry.of(...) (test factory, bypasses handlers) produce the same hash as entries built via Canonical.toEntry(...) (production, via handlers); - all values that collapsed to "blocked_by_tracer" share that sentinel instance and therefore that hashCode -- they land in the same bucket and merge into one entry. Matches ------- The SpanSnapshot-keyed matches() on AggregateEntry is gone. Lookup goes through Canonical.matches(entry) which compares the buffer's UTF8 fields against the entry's UTF8 fields via Objects.equals (content equality on UTF8BytesString). This is needed because across handler resets the UTF8BytesString instance referenced by an existing entry differs from the freshly-issued instance for the same content -- content-equality lets the existing entry survive resets. The peerTagPairsRaw field on AggregateEntry was previously kept for matching against snapshot.peerTagPairs (the flat String[]). Canonical.matches uses List.equals on the encoded UTF8 peerTags directly, so peerTagPairsRaw is dropped. New test in AggregateTableTest -- cardinalityBlockedValuesCollapseIntoOneEntry inserts 50 distinct services into a table whose SERVICE_HANDLER has a cardinality limit of 32, and asserts the final size is 33 (the 32 in-budget services plus a single collapsed "blocked_by_tracer" entry, not 50 separate entries). Benchmarks (2 forks x 5 iter x 15s) -- producer side unchanged: SimpleSpan bench: 3.117 +- 0.026 us/op (prior: 3.114 +- 0.045) DDSpan bench: 2.344 +- 0.114 us/op (prior: 2.364 +- 0.113) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 407 +++++++++--------- .../trace/common/metrics/AggregateTable.java | 21 +- .../common/metrics/AggregateTableTest.java | 21 + 3 files changed, 247 insertions(+), 202 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 55536b7a8f3..c28bf5722f6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -4,7 +4,6 @@ import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -15,40 +14,38 @@ * Hashtable entry for the consumer-side aggregator. Holds the UTF8-encoded label fields (the data * {@link SerializingMetricWriter} writes to the wire) plus the mutable {@link AggregateMetric}. * - *

{@link #matches(SpanSnapshot)} compares the entry's stored UTF8 forms against the snapshot's - * raw {@code CharSequence}/{@code String}/{@code String[]} fields via content-equality, so {@code - * String} vs {@code UTF8BytesString} mixing on the same logical key collapses into one entry - * instead of splitting. - * *

UTF8 canonicalization runs through per-field {@link PropertyCardinalityHandler}s (and {@link - * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval and - * overflow values are bucketed into a {@code blocked_by_tracer} sentinel rather than allowed to - * grow without bound. The handlers are reset on the aggregator thread every reporting cycle via - * {@link #resetCardinalityHandlers()}. + * TagCardinalityHandler}s for peer tags), so cardinality is capped per reporting interval. The + * critical property: hashing and matching happen after canonicalization, so when a field's + * cardinality budget is exhausted and overflow values collapse to a {@code blocked_by_tracer} + * sentinel, those values land in the same bucket and merge into a single entry rather than + * fragmenting. + * + *

The aggregator thread is the sole writer. {@link AggregateTable} holds a reusable {@link + * Canonical} scratch buffer so the canonicalization itself doesn't allocate per lookup; on a miss + * the buffer's references are copied into a fresh entry. On a hit nothing is allocated. * - *

Thread-safety: the cardinality handlers are not thread-safe. Only the aggregator thread - * may call {@link #forSnapshot} or {@link #resetCardinalityHandlers}. Test code uses {@link #of} - * which constructs entries without touching the handlers. + *

The handlers are reset on the aggregator thread every reporting cycle via {@link + * #resetCardinalityHandlers()}. + * + *

Thread-safety: the cardinality handlers and {@link Canonical} are not thread-safe. Only + * the aggregator thread may call {@link Canonical#populate} or {@link #resetCardinalityHandlers}. + * Test code uses {@link #of} which constructs entries without touching the handlers. */ final class AggregateEntry extends Hashtable.Entry { // Per-field cardinality limits. Identical to the prior DDCache sizes. - private static final PropertyCardinalityHandler RESOURCE_HANDLER = - new PropertyCardinalityHandler(32); - private static final PropertyCardinalityHandler SERVICE_HANDLER = - new PropertyCardinalityHandler(32); - private static final PropertyCardinalityHandler OPERATION_HANDLER = - new PropertyCardinalityHandler(64); - private static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER = - new PropertyCardinalityHandler(16); - private static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8); - private static final PropertyCardinalityHandler SPAN_KIND_HANDLER = + static final PropertyCardinalityHandler RESOURCE_HANDLER = new PropertyCardinalityHandler(32); + static final PropertyCardinalityHandler SERVICE_HANDLER = new PropertyCardinalityHandler(32); + static final PropertyCardinalityHandler OPERATION_HANDLER = new PropertyCardinalityHandler(64); + static final PropertyCardinalityHandler SERVICE_SOURCE_HANDLER = new PropertyCardinalityHandler(16); - private static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = - new PropertyCardinalityHandler(8); - private static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER = + static final PropertyCardinalityHandler TYPE_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler SPAN_KIND_HANDLER = new PropertyCardinalityHandler(16); + static final PropertyCardinalityHandler HTTP_METHOD_HANDLER = new PropertyCardinalityHandler(8); + static final PropertyCardinalityHandler HTTP_ENDPOINT_HANDLER = new PropertyCardinalityHandler(32); - private static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = + static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = new PropertyCardinalityHandler(32); /** Per-peer-tag-name {@link TagCardinalityHandler}, each sized to 512 distinct values. */ @@ -56,24 +53,19 @@ final class AggregateEntry extends Hashtable.Entry { private static final int PEER_TAG_VALUE_LIMIT = 512; - private final UTF8BytesString resource; - private final UTF8BytesString service; - private final UTF8BytesString operationName; - private final UTF8BytesString serviceSource; // nullable - private final UTF8BytesString type; - private final UTF8BytesString spanKind; - private final UTF8BytesString httpMethod; // nullable - private final UTF8BytesString httpEndpoint; // nullable - private final UTF8BytesString grpcStatusCode; // nullable - private final short httpStatusCode; - private final boolean synthetic; - private final boolean traceRoot; - - // Peer tags carried in two forms: raw String[] for matches() against the snapshot's pairs, - // and pre-encoded List ("name:value") for the serializer. - private final String[] peerTagPairsRaw; - private final List peerTags; - + final UTF8BytesString resource; + final UTF8BytesString service; + final UTF8BytesString operationName; + final UTF8BytesString serviceSource; // nullable + final UTF8BytesString type; + final UTF8BytesString spanKind; + final UTF8BytesString httpMethod; // nullable + final UTF8BytesString httpEndpoint; // nullable + final UTF8BytesString grpcStatusCode; // nullable + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final List peerTags; final AggregateMetric aggregate; /** Field-bearing constructor used by both the hot path and the test factory. */ @@ -91,7 +83,6 @@ private AggregateEntry( short httpStatusCode, boolean synthetic, boolean traceRoot, - String[] peerTagPairsRaw, List peerTags, AggregateMetric aggregate) { super(keyHash); @@ -107,47 +98,15 @@ private AggregateEntry( this.httpStatusCode = httpStatusCode; this.synthetic = synthetic; this.traceRoot = traceRoot; - this.peerTagPairsRaw = peerTagPairsRaw; this.peerTags = peerTags; this.aggregate = aggregate; } - /** - * Production hot path: canonicalize each snapshot field via the cardinality handlers. Must be - * called on the aggregator thread. Null-valued fields short-circuit to {@link - * UTF8BytesString#EMPTY} (or {@code null} for optional ones) so they don't consume a cardinality - * slot. - */ - static AggregateEntry forSnapshot(SpanSnapshot s, AggregateMetric aggregate) { - return new AggregateEntry( - hashOf(s), - registerOrEmpty(RESOURCE_HANDLER, s.resourceName), - registerOrEmpty(SERVICE_HANDLER, s.serviceName), - registerOrEmpty(OPERATION_HANDLER, s.operationName), - s.serviceNameSource == null ? null : SERVICE_SOURCE_HANDLER.register(s.serviceNameSource), - registerOrEmpty(TYPE_HANDLER, s.spanType), - registerOrEmpty(SPAN_KIND_HANDLER, s.spanKind), - s.httpMethod == null ? null : HTTP_METHOD_HANDLER.register(s.httpMethod), - s.httpEndpoint == null ? null : HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint), - s.grpcStatusCode == null ? null : GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode), - s.httpStatusCode, - s.synthetic, - s.traceRoot, - s.peerTagPairs, - canonicalizePeerTags(s.peerTagPairs), - aggregate); - } - - private static UTF8BytesString registerOrEmpty( - PropertyCardinalityHandler handler, CharSequence value) { - return value == null ? UTF8BytesString.EMPTY : handler.register(value); - } - /** * Test-friendly factory mirroring the prior {@code new MetricKey(...)} positional args. Bypasses * the cardinality handlers so tests don't pollute their state -- {@link UTF8BytesString}s are - * created directly. Content-equality on the resulting entry still matches an entry built via - * {@link #forSnapshot} from a snapshot of the same shape. + * created directly. Content-equal entries from {@link Canonical#toEntry} still {@link #equals} an + * entry built via {@code of(...)}. */ static AggregateEntry of( CharSequence resource, @@ -163,47 +122,54 @@ static AggregateEntry of( CharSequence httpMethod, CharSequence httpEndpoint, CharSequence grpcStatusCode) { - String[] rawPairs = peerTagsToRawPairs(peerTags); - SpanSnapshot syntheticSnapshot = - new SpanSnapshot( - resource, - service == null ? null : service.toString(), - operationName, - serviceSource, - type, + UTF8BytesString resourceUtf = createUtf8(resource); + UTF8BytesString serviceUtf = createUtf8(service); + UTF8BytesString operationNameUtf = createUtf8(operationName); + UTF8BytesString serviceSourceUtf = serviceSource == null ? null : createUtf8(serviceSource); + UTF8BytesString typeUtf = createUtf8(type); + UTF8BytesString spanKindUtf = createUtf8(spanKind); + UTF8BytesString httpMethodUtf = httpMethod == null ? null : createUtf8(httpMethod); + UTF8BytesString httpEndpointUtf = httpEndpoint == null ? null : createUtf8(httpEndpoint); + UTF8BytesString grpcUtf = grpcStatusCode == null ? null : createUtf8(grpcStatusCode); + List peerTagsList = peerTags == null ? Collections.emptyList() : peerTags; + long keyHash = + hashOf( + resourceUtf, + serviceUtf, + operationNameUtf, + serviceSourceUtf, + typeUtf, + spanKindUtf, + httpMethodUtf, + httpEndpointUtf, + grpcUtf, (short) httpStatusCode, synthetic, traceRoot, - spanKind == null ? null : spanKind.toString(), - rawPairs, - httpMethod == null ? null : httpMethod.toString(), - httpEndpoint == null ? null : httpEndpoint.toString(), - grpcStatusCode == null ? null : grpcStatusCode.toString(), - 0L); + peerTagsList); return new AggregateEntry( - hashOf(syntheticSnapshot), - createUtf8(resource), - createUtf8(service), - createUtf8(operationName), - serviceSource == null ? null : createUtf8(serviceSource), - createUtf8(type), - createUtf8(spanKind), - httpMethod == null ? null : createUtf8(httpMethod), - httpEndpoint == null ? null : createUtf8(httpEndpoint), - grpcStatusCode == null ? null : createUtf8(grpcStatusCode), + keyHash, + resourceUtf, + serviceUtf, + operationNameUtf, + serviceSourceUtf, + typeUtf, + spanKindUtf, + httpMethodUtf, + httpEndpointUtf, + grpcUtf, (short) httpStatusCode, synthetic, traceRoot, - rawPairs, - peerTags == null ? Collections.emptyList() : peerTags, + peerTagsList, new AggregateMetric()); } /** * Resets every cardinality handler's working set. Must be called on the aggregator thread. * Existing entries continue to hold their previously-issued {@link UTF8BytesString} references; - * matches() uses content-equality so snapshots delivered after a reset still resolve to the - * existing entries. + * matches via content-equality so snapshots delivered after a reset still resolve to the existing + * entries. */ static void resetCardinalityHandlers() { RESOURCE_HANDLER.reset(); @@ -220,47 +186,42 @@ static void resetCardinalityHandlers() { } } - boolean matches(SpanSnapshot s) { - return httpStatusCode == s.httpStatusCode - && synthetic == s.synthetic - && traceRoot == s.traceRoot - && contentEquals(resource, s.resourceName) - && stringContentEquals(service, s.serviceName) - && contentEquals(operationName, s.operationName) - && contentEquals(serviceSource, s.serviceNameSource) - && contentEquals(type, s.spanType) - && stringContentEquals(spanKind, s.spanKind) - && Arrays.equals(peerTagPairsRaw, s.peerTagPairs) - && stringContentEquals(httpMethod, s.httpMethod) - && stringContentEquals(httpEndpoint, s.httpEndpoint) - && stringContentEquals(grpcStatusCode, s.grpcStatusCode); - } - /** - * Computes the 64-bit lookup hash for a {@link SpanSnapshot}. Chained per-field calls -- no - * varargs / Object[] allocation, no autoboxing on primitive overloads. Hashes are content-stable - * across {@code String} / {@code UTF8BytesString} because {@link UTF8BytesString#hashCode()} - * returns the underlying {@code String}'s hash. + * 64-bit lookup hash, computed over UTF8-encoded fields so that cardinality-blocked values (which + * all canonicalize to the same sentinel {@link UTF8BytesString}) collide in the same bucket. + * {@link UTF8BytesString#hashCode()} returns the underlying String hash, so entries built via + * {@link #of} produce the same hash as entries built from a snapshot with matching content. */ - static long hashOf(SpanSnapshot s) { + static long hashOf( + UTF8BytesString resource, + UTF8BytesString service, + UTF8BytesString operationName, + UTF8BytesString serviceSource, + UTF8BytesString type, + UTF8BytesString spanKind, + UTF8BytesString httpMethod, + UTF8BytesString httpEndpoint, + UTF8BytesString grpcStatusCode, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + List peerTags) { long h = 0; - h = LongHashingUtils.addToHash(h, s.resourceName); - h = LongHashingUtils.addToHash(h, s.serviceName); - h = LongHashingUtils.addToHash(h, s.operationName); - h = LongHashingUtils.addToHash(h, s.serviceNameSource); - h = LongHashingUtils.addToHash(h, s.spanType); - h = LongHashingUtils.addToHash(h, s.httpStatusCode); - h = LongHashingUtils.addToHash(h, s.synthetic); - h = LongHashingUtils.addToHash(h, s.traceRoot); - h = LongHashingUtils.addToHash(h, s.spanKind); - if (s.peerTagPairs != null) { - for (String p : s.peerTagPairs) { - h = LongHashingUtils.addToHash(h, p); - } + h = LongHashingUtils.addToHash(h, resource); + h = LongHashingUtils.addToHash(h, service); + h = LongHashingUtils.addToHash(h, operationName); + h = LongHashingUtils.addToHash(h, serviceSource); + h = LongHashingUtils.addToHash(h, type); + h = LongHashingUtils.addToHash(h, httpStatusCode); + h = LongHashingUtils.addToHash(h, synthetic); + h = LongHashingUtils.addToHash(h, traceRoot); + h = LongHashingUtils.addToHash(h, spanKind); + for (UTF8BytesString p : peerTags) { + h = LongHashingUtils.addToHash(h, p); } - h = LongHashingUtils.addToHash(h, s.httpMethod); - h = LongHashingUtils.addToHash(h, s.httpEndpoint); - h = LongHashingUtils.addToHash(h, s.grpcStatusCode); + h = LongHashingUtils.addToHash(h, httpMethod); + h = LongHashingUtils.addToHash(h, httpEndpoint); + h = LongHashingUtils.addToHash(h, grpcStatusCode); return h; } @@ -319,8 +280,8 @@ List getPeerTags() { /** * Equality on the 13 label fields (not on the aggregate). Used only by test mock matchers; the - * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link #matches(SpanSnapshot)} - * and never calls {@code equals}. + * {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link Canonical#matches} and + * never calls {@code equals}. */ @Override public boolean equals(Object o) { @@ -347,8 +308,114 @@ public int hashCode() { return (int) keyHash; } + /** + * Reusable scratch buffer for canonicalizing a {@link SpanSnapshot} into UTF8 fields, computing + * its lookup hash, comparing against existing entries, and building a fresh entry on miss. + * + *

One instance is held by an {@link AggregateTable} and reused on every {@code findOrInsert} + * call. Single-threaded use only. Fields are deliberately mutable -- this is a hot-path scratch + * area, not a value class. + */ + static final class Canonical { + UTF8BytesString resource; + UTF8BytesString service; + UTF8BytesString operationName; + UTF8BytesString serviceSource; // nullable + UTF8BytesString type; + UTF8BytesString spanKind; + UTF8BytesString httpMethod; // nullable + UTF8BytesString httpEndpoint; // nullable + UTF8BytesString grpcStatusCode; // nullable + short httpStatusCode; + boolean synthetic; + boolean traceRoot; + List peerTags; + long keyHash; + + /** Canonicalize all fields from {@code s} through the handlers into this buffer. */ + void populate(SpanSnapshot s) { + this.resource = registerOrEmpty(RESOURCE_HANDLER, s.resourceName); + this.service = registerOrEmpty(SERVICE_HANDLER, s.serviceName); + this.operationName = registerOrEmpty(OPERATION_HANDLER, s.operationName); + this.serviceSource = + s.serviceNameSource == null ? null : SERVICE_SOURCE_HANDLER.register(s.serviceNameSource); + this.type = registerOrEmpty(TYPE_HANDLER, s.spanType); + this.spanKind = registerOrEmpty(SPAN_KIND_HANDLER, s.spanKind); + this.httpMethod = s.httpMethod == null ? null : HTTP_METHOD_HANDLER.register(s.httpMethod); + this.httpEndpoint = + s.httpEndpoint == null ? null : HTTP_ENDPOINT_HANDLER.register(s.httpEndpoint); + this.grpcStatusCode = + s.grpcStatusCode == null ? null : GRPC_STATUS_CODE_HANDLER.register(s.grpcStatusCode); + this.httpStatusCode = s.httpStatusCode; + this.synthetic = s.synthetic; + this.traceRoot = s.traceRoot; + this.peerTags = canonicalizePeerTags(s.peerTagPairs); + this.keyHash = + hashOf( + resource, + service, + operationName, + serviceSource, + type, + spanKind, + httpMethod, + httpEndpoint, + grpcStatusCode, + httpStatusCode, + synthetic, + traceRoot, + peerTags); + } + + /** + * Whether this canonicalized snapshot matches the given entry. Compares UTF8 fields via + * content-equality (so an entry surviving a handler reset still matches a freshly-canonicalized + * snapshot of the same content). + */ + boolean matches(AggregateEntry e) { + return httpStatusCode == e.httpStatusCode + && synthetic == e.synthetic + && traceRoot == e.traceRoot + && Objects.equals(resource, e.resource) + && Objects.equals(service, e.service) + && Objects.equals(operationName, e.operationName) + && Objects.equals(serviceSource, e.serviceSource) + && Objects.equals(type, e.type) + && Objects.equals(spanKind, e.spanKind) + && peerTags.equals(e.peerTags) + && Objects.equals(httpMethod, e.httpMethod) + && Objects.equals(httpEndpoint, e.httpEndpoint) + && Objects.equals(grpcStatusCode, e.grpcStatusCode); + } + + /** Build a new entry from the currently-populated canonical fields. */ + AggregateEntry toEntry(AggregateMetric aggregate) { + return new AggregateEntry( + keyHash, + resource, + service, + operationName, + serviceSource, + type, + spanKind, + httpMethod, + httpEndpoint, + grpcStatusCode, + httpStatusCode, + synthetic, + traceRoot, + peerTags, + aggregate); + } + } + // ----- helpers ----- + private static UTF8BytesString registerOrEmpty( + PropertyCardinalityHandler handler, CharSequence value) { + return value == null ? UTF8BytesString.EMPTY : handler.register(value); + } + /** Direct {@link UTF8BytesString} creation that bypasses the cardinality handlers. */ private static UTF8BytesString createUtf8(CharSequence cs) { if (cs == null) { @@ -360,32 +427,6 @@ private static UTF8BytesString createUtf8(CharSequence cs) { return UTF8BytesString.create(cs.toString()); } - /** UTF8 vs raw CharSequence content-equality, no allocation in the common (String) case. */ - private static boolean contentEquals(UTF8BytesString a, CharSequence b) { - if (a == null) { - return b == null; - } - if (b == null) { - return false; - } - // UTF8BytesString.toString() returns the underlying String -- O(1), no allocation. - String aStr = a.toString(); - if (b instanceof String) { - return aStr.equals(b); - } - if (b instanceof UTF8BytesString) { - return aStr.equals(b.toString()); - } - return aStr.contentEquals(b); - } - - private static boolean stringContentEquals(UTF8BytesString a, String b) { - if (a == null) { - return b == null; - } - return b != null && a.toString().equals(b); - } - /** Production-path peer-tag canonicalization via per-name {@link TagCardinalityHandler}. */ private static List canonicalizePeerTags(String[] pairs) { if (pairs == null || pairs.length == 0) { @@ -410,24 +451,4 @@ private static TagCardinalityHandler handlerFor(String peerTagName) { PEER_TAG_HANDLERS.put(peerTagName, h); return h; } - - /** - * Inverse of {@link #canonicalizePeerTags}: takes pre-encoded UTF8 peer tags and recovers the raw - * {@code [name0, value0, name1, value1, ...]} pairs. Used by the test factory {@link #of}, not by - * the hot path. - */ - private static String[] peerTagsToRawPairs(List peerTags) { - if (peerTags == null || peerTags.isEmpty()) { - return null; - } - String[] pairs = new String[peerTags.size() * 2]; - int i = 0; - for (UTF8BytesString peerTag : peerTags) { - String s = peerTag.toString(); - int colon = s.indexOf(':'); - pairs[i++] = colon < 0 ? s : s.substring(0, colon); - pairs[i++] = colon < 0 ? "" : s.substring(colon + 1); - } - return pairs; - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java index 08300eab296..38d45ef5e85 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -4,13 +4,14 @@ import java.util.function.Consumer; /** - * Consumer-side {@link AggregateMetric} store, keyed on the raw fields of a {@link SpanSnapshot}. + * Consumer-side {@link AggregateMetric} store, keyed on the canonical UTF8-encoded labels of a + * {@link SpanSnapshot}. * - *

Replaces the prior {@code LRUCache}. The win is on the - * steady-state hit path: a snapshot lookup is a 64-bit hash compute + bucket walk + field-wise - * {@code matches}, with no per-snapshot {@link AggregateEntry} allocation and no UTF8 cache - * lookups. The UTF8-encoded forms (formerly held on {@code MetricKey}) live on the {@link - * AggregateEntry} itself and are built once per unique key at insert time. + *

{@link #findOrInsert} canonicalizes the snapshot's fields through the cardinality handlers (so + * cardinality-blocked values share a sentinel and collapse into one entry) and then computes the + * lookup hash from that canonical form. Canonicalization runs into a reusable {@link + * AggregateEntry.Canonical} scratch buffer; on a hit nothing is allocated, on a miss the buffer's + * references are copied into a fresh entry and the buffer is overwritten on the next call. * *

Not thread-safe. The aggregator thread is the sole writer; {@link #clear()} must be * routed through the inbox rather than called from arbitrary threads. @@ -19,6 +20,7 @@ final class AggregateTable { private final Hashtable.Entry[] buckets; private final int maxAggregates; + private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical(); private int size; AggregateTable(int maxAggregates) { @@ -40,12 +42,13 @@ boolean isEmpty() { * the caller should drop the data point in that case. */ AggregateMetric findOrInsert(SpanSnapshot snapshot) { - long keyHash = AggregateEntry.hashOf(snapshot); + canonical.populate(snapshot); + long keyHash = canonical.keyHash; int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { if (e.keyHash == keyHash) { AggregateEntry candidate = (AggregateEntry) e; - if (candidate.matches(snapshot)) { + if (canonical.matches(candidate)) { return candidate.aggregate; } } @@ -53,7 +56,7 @@ AggregateMetric findOrInsert(SpanSnapshot snapshot) { if (size >= maxAggregates && !evictOneStale()) { return null; } - AggregateEntry entry = AggregateEntry.forSnapshot(snapshot, new AggregateMetric()); + AggregateEntry entry = canonical.toEntry(new AggregateMetric()); entry.setNext(buckets[bucketIndex]); buckets[bucketIndex] = entry; size++; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 44f2b36cb6b..b8bf8fd1a3b 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -87,6 +87,27 @@ void peerTagPairsParticipateInIdentity() { assertEquals(3, table.size()); } + @Test + void cardinalityBlockedValuesCollapseIntoOneEntry() { + // SERVICE_HANDLER has a cardinality limit of 32. With 50 distinct service names, services 33+ + // canonicalize to the "blocked_by_tracer" sentinel. Because the table hashes from the canonical + // (post-handler) form, all blocked services land in the same bucket and merge into a single + // entry rather than fragmenting. + AggregateEntry.resetCardinalityHandlers(); + AggregateTable table = new AggregateTable(128); + + for (int i = 0; i < 50; i++) { + AggregateMetric agg = table.findOrInsert(snapshot("svc-" + i, "op", "client")); + assertNotNull(agg); + agg.recordOneDuration(1L); + } + + // 32 in-budget services + 1 collapsed "blocked_by_tracer" entry = 33 total. + assertEquals(33, table.size()); + + AggregateEntry.resetCardinalityHandlers(); + } + @Test void capOverrunEvictsStaleEntry() { AggregateTable table = new AggregateTable(2); From 1ad981af4fbc16114e42a2add69bd2fe209a5955 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 17:04:32 -0400 Subject: [PATCH 3/8] Defer peer-tag pair construction; capture values + canonicalize via schema-indexed handlers Replaces the producer's early {@code (name, value)}-pair encoding with a schema-based design: peer-tag values are captured into a parallel String array, and the consumer applies the matching {@link TagCardinalityHandler} by index using a {@link PeerTagSchema}'s parallel name/handler arrays. This removes the {@code Map} the prior commit left in {@code AggregateEntry} -- handler lookup is now a single array dereference instead of a hashmap probe. PeerTagSchema ------------- New package-private class that holds: - {@code String[] names} -- peer-tag names in stable order - {@code TagCardinalityHandler[] handlers} -- parallel to names Two schemas exist: a static singleton {@code INTERNAL} for the internal-kind {@code base.service} case, and a {@code CURRENT} schema for the peer- aggregation kinds (client/producer/consumer) that lazily refreshes when {@code features.peerTags()} returns a different set of names. Each {@link SpanSnapshot} captures the schema reference it was built against so producer and consumer agree on the indexing even if {@code CURRENT} changes between capture and consumption. A fast-path identity check (cached last input Set instance) keeps the {@code currentSyncedTo} call cheap: when the producer hands in the same Set instance as last time -- the steady-state case -- {@code currentSyncedTo} returns immediately without iterating names. The {@code matches()} loop only runs when the Set instance changes, which in production is rare (only on remote-config reconfiguration). Snapshot shape -------------- {@code SpanSnapshot.peerTagPairs} (a flat {@code [name0, value0, name1, value1, ...]} array) is replaced by: - {@code PeerTagSchema peerTagSchema} -- nullable; schema for the values - {@code String[] peerTagValues} -- parallel to schema.names The producer captures only values; the consumer constructs the encoded {@code "name:value"} UTF8 forms via {@code schema.handler(i).register(value)} on its own thread. Consumer-side cleanups bundled in --------------------------------- While here, also addresses the perf review items raised against the prior commit: - {@code hashOf}'s peer-tag loop is now indexed iteration; no more iterator allocation per snapshot. - {@code Canonical} now owns a reusable {@code peerTagsBuffer} ArrayList that's cleared+refilled per {@code populate} call -- zero allocation on the hit path. The buffer is copied into an immutable list only on miss when the entry needs to own it long-term. - {@code Canonical.matches} uses indexed list comparison; no iterator alloc in {@code List.equals}. - The {@code HashMap PEER_TAG_HANDLERS} on {@code AggregateEntry} is gone, replaced by the {@link PeerTagSchema}'s parallel array layout. Benchmark (2 forks x 5 iter x 15s) ---------------------------------- SimpleSpan bench: 3.165 +- 0.032 us/op (prior: 3.117 +- 0.026) DDSpan bench: 2.727 +- 0.018 us/op (prior: 2.344 +- 0.114) Some producer-side regression from the per-snapshot schema sync (volatile read + identity check). The fast-path identity comparison keeps it small; hoisting the sync out of the per-snapshot loop is possible but would change behavior in the edge case where {@code features.peerTags()} returns different Sets within a single trace (covered by an existing test). Choosing correctness over the marginal speedup. Tests ----- AggregateTableTest's snapshot builder is updated to construct a schema + values via {@code PeerTagSchema.currentSyncedTo}, exercising the same code path as production. Existing peer-tag test in {@code ConflatingMetricAggregatorTest} still passes unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/AggregateEntry.java | 107 +++++++++------ .../metrics/ConflatingMetricsAggregator.java | 72 ++++++----- .../trace/common/metrics/PeerTagSchema.java | 122 ++++++++++++++++++ .../trace/common/metrics/SpanSnapshot.java | 20 ++- .../common/metrics/AggregateTableTest.java | 24 +++- 5 files changed, 264 insertions(+), 81 deletions(-) create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index c28bf5722f6..225f03197e5 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -5,9 +5,7 @@ import datadog.trace.util.LongHashingUtils; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; /** @@ -48,11 +46,6 @@ final class AggregateEntry extends Hashtable.Entry { static final PropertyCardinalityHandler GRPC_STATUS_CODE_HANDLER = new PropertyCardinalityHandler(32); - /** Per-peer-tag-name {@link TagCardinalityHandler}, each sized to 512 distinct values. */ - private static final Map PEER_TAG_HANDLERS = new HashMap<>(); - - private static final int PEER_TAG_VALUE_LIMIT = 512; - final UTF8BytesString resource; final UTF8BytesString service; final UTF8BytesString operationName; @@ -181,9 +174,7 @@ static void resetCardinalityHandlers() { HTTP_METHOD_HANDLER.reset(); HTTP_ENDPOINT_HANDLER.reset(); GRPC_STATUS_CODE_HANDLER.reset(); - for (TagCardinalityHandler h : PEER_TAG_HANDLERS.values()) { - h.reset(); - } + PeerTagSchema.resetAll(); } /** @@ -216,8 +207,10 @@ static long hashOf( h = LongHashingUtils.addToHash(h, synthetic); h = LongHashingUtils.addToHash(h, traceRoot); h = LongHashingUtils.addToHash(h, spanKind); - for (UTF8BytesString p : peerTags) { - h = LongHashingUtils.addToHash(h, p); + // indexed iteration -- avoids the iterator allocation a for-each over a List would do + int peerTagCount = peerTags.size(); + for (int i = 0; i < peerTagCount; i++) { + h = LongHashingUtils.addToHash(h, peerTags.get(i)); } h = LongHashingUtils.addToHash(h, httpMethod); h = LongHashingUtils.addToHash(h, httpEndpoint); @@ -329,7 +322,14 @@ static final class Canonical { short httpStatusCode; boolean synthetic; boolean traceRoot; - List peerTags; + + /** + * Reusable buffer of canonicalized peer-tag UTF8 forms. Cleared and refilled in {@link + * #populate}; on miss, {@link #toEntry} copies it into an immutable list for the entry to own. + * Zero allocation on the hit path. + */ + final ArrayList peerTagsBuffer = new ArrayList<>(4); + long keyHash; /** Canonicalize all fields from {@code s} through the handlers into this buffer. */ @@ -349,7 +349,7 @@ void populate(SpanSnapshot s) { this.httpStatusCode = s.httpStatusCode; this.synthetic = s.synthetic; this.traceRoot = s.traceRoot; - this.peerTags = canonicalizePeerTags(s.peerTagPairs); + populatePeerTags(s.peerTagSchema, s.peerTagValues); this.keyHash = hashOf( resource, @@ -364,7 +364,26 @@ void populate(SpanSnapshot s) { httpStatusCode, synthetic, traceRoot, - peerTags); + peerTagsBuffer); + } + + /** + * Fills {@link #peerTagsBuffer} with canonical UTF8 forms, applying {@code schema.handler(i)} + * to each non-null value at the same index. No allocation when the schema/values are absent or + * all values are null (buffer is just cleared). + */ + private void populatePeerTags(PeerTagSchema schema, String[] values) { + peerTagsBuffer.clear(); + if (schema == null || values == null) { + return; + } + int n = schema.size(); + for (int i = 0; i < n; i++) { + String v = values[i]; + if (v != null) { + peerTagsBuffer.add(schema.handler(i).register(v)); + } + } } /** @@ -382,14 +401,41 @@ boolean matches(AggregateEntry e) { && Objects.equals(serviceSource, e.serviceSource) && Objects.equals(type, e.type) && Objects.equals(spanKind, e.spanKind) - && peerTags.equals(e.peerTags) + && peerTagsEqual(peerTagsBuffer, e.peerTags) && Objects.equals(httpMethod, e.httpMethod) && Objects.equals(httpEndpoint, e.httpEndpoint) && Objects.equals(grpcStatusCode, e.grpcStatusCode); } - /** Build a new entry from the currently-populated canonical fields. */ + /** Indexed list comparison -- avoids the iterator a {@code List.equals} would allocate. */ + private static boolean peerTagsEqual(List a, List b) { + int n = a.size(); + if (n != b.size()) { + return false; + } + for (int i = 0; i < n; i++) { + if (!a.get(i).equals(b.get(i))) { + return false; + } + } + return true; + } + + /** + * Build a new entry from the currently-populated canonical fields. The peer-tag buffer is + * copied into an immutable list so the entry's reference stays stable across subsequent {@link + * #populate} calls. + */ AggregateEntry toEntry(AggregateMetric aggregate) { + List snapshottedPeerTags; + int n = peerTagsBuffer.size(); + if (n == 0) { + snapshottedPeerTags = Collections.emptyList(); + } else if (n == 1) { + snapshottedPeerTags = Collections.singletonList(peerTagsBuffer.get(0)); + } else { + snapshottedPeerTags = new ArrayList<>(peerTagsBuffer); + } return new AggregateEntry( keyHash, resource, @@ -404,7 +450,7 @@ AggregateEntry toEntry(AggregateMetric aggregate) { httpStatusCode, synthetic, traceRoot, - peerTags, + snapshottedPeerTags, aggregate); } } @@ -426,29 +472,4 @@ private static UTF8BytesString createUtf8(CharSequence cs) { } return UTF8BytesString.create(cs.toString()); } - - /** Production-path peer-tag canonicalization via per-name {@link TagCardinalityHandler}. */ - private static List canonicalizePeerTags(String[] pairs) { - if (pairs == null || pairs.length == 0) { - return Collections.emptyList(); - } - if (pairs.length == 2) { - return Collections.singletonList(handlerFor(pairs[0]).register(pairs[1])); - } - List tags = new ArrayList<>(pairs.length / 2); - for (int i = 0; i < pairs.length; i += 2) { - tags.add(handlerFor(pairs[i]).register(pairs[i + 1])); - } - return tags; - } - - private static TagCardinalityHandler handlerFor(String peerTagName) { - TagCardinalityHandler h = PEER_TAG_HANDLERS.get(peerTagName); - if (h != null) { - return h; - } - h = new TagCardinalityHandler(peerTagName, PEER_TAG_VALUE_LIMIT); - PEER_TAG_HANDLERS.put(peerTagName, h); - return h; - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index c675fcb23c4..7497ed9a799 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -2,7 +2,6 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; -import static datadog.trace.api.DDTags.BASE_SERVICE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; @@ -294,6 +293,15 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { long tagAndDuration = span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); + PeerTagSchema peerTagSchema = peerTagSchemaFor(span); + String[] peerTagValues = + peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema); + if (peerTagValues == null) { + // capture returned no non-null values -- drop the schema reference so the consumer doesn't + // bother iterating an all-null array. + peerTagSchema = null; + } + SpanSnapshot snapshot = new SpanSnapshot( span.getResourceName(), @@ -305,7 +313,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { isSynthetic(span), span.getParentId() == 0, spanKind, - extractPeerTagPairs(span), + peerTagSchema, + peerTagValues, httpMethod, httpEndpoint, grpcStatusCode, @@ -317,41 +326,44 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { return error; } - private String[] extractPeerTagPairs(CoreSpan span) { + /** + * Picks the peer-tag schema for a span. For peer-aggregation kinds, syncs the schema with + * {@code features.peerTags()} so producer and consumer share the same name/handler ordering. + * For internal-kind spans returns the static {@link PeerTagSchema#INTERNAL} schema. + */ + private PeerTagSchema peerTagSchemaFor(CoreSpan span) { if (span.isKind(PEER_AGGREGATION_KINDS)) { - final Set eligiblePeerTags = features.peerTags(); - String[] pairs = null; - int count = 0; - for (String peerTag : eligiblePeerTags) { - Object value = span.unsafeGetTag(peerTag); - if (value != null) { - if (pairs == null) { - // pairs are flattened [name, value, ...]; size for worst case - pairs = new String[eligiblePeerTags.size() * 2]; - } - pairs[count++] = peerTag; - pairs[count++] = value.toString(); - } - } - if (pairs == null) { + Set eligible = features.peerTags(); + if (eligible == null || eligible.isEmpty()) { return null; } - if (count < pairs.length) { - String[] trimmed = new String[count]; - System.arraycopy(pairs, 0, trimmed, 0, count); - return trimmed; - } - return pairs; - } else if (span.isKind(INTERNAL_KIND)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.unsafeGetTag(BASE_SERVICE); - if (baseService != null) { - return new String[] {BASE_SERVICE, baseService.toString()}; - } + return PeerTagSchema.currentSyncedTo(eligible); + } + if (span.isKind(INTERNAL_KIND)) { + return PeerTagSchema.INTERNAL; } return null; } + /** + * Captures the span's peer tag values into a {@code String[]} parallel to {@code schema.names}. + * Returns {@code null} when none of the configured peer tags are set on the span. + */ + private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema schema) { + int n = schema.size(); + String[] values = null; + for (int i = 0; i < n; i++) { + Object v = span.unsafeGetTag(schema.name(i)); + if (v != null) { + if (values == null) { + values = new String[n]; + } + values[i] = v.toString(); + } + } + return values; + } + private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java new file mode 100644 index 00000000000..f41b2634da6 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -0,0 +1,122 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.DDTags.BASE_SERVICE; + +import java.util.Set; + +/** + * Parallel arrays of peer-tag names and their {@link TagCardinalityHandler}s, indexed in lockstep. + * + *

Replaces the previous {@code Map} lookup with positional array + * access: the producer captures span tag values into a {@code String[]} parallel to {@link #names}, + * and the consumer applies {@link #handler(int)} at the same index to canonicalize. + * + *

Two schemas exist: + * + *

+ * + *

Each {@link SpanSnapshot} captures its own schema reference so producer and consumer agree on + * the indexing even if the current schema is replaced between capture and consumption. + * + *

Thread-safety: {@link #currentSyncedTo} may be called from producer threads; + * replacement of the volatile {@code CURRENT} reference is guarded by a lock. The {@link + * TagCardinalityHandler}s themselves are not thread-safe and must only be exercised on the + * aggregator thread (this is where the snapshot's schema is consumed). + */ +final class PeerTagSchema { + + private static final int VALUE_LIMIT_PER_TAG = 512; + + /** Singleton schema for internal-kind spans -- only {@code base.service}. */ + static final PeerTagSchema INTERNAL = new PeerTagSchema(new String[] {BASE_SERVICE}); + + /** Current schema for peer-aggregation kinds; replaced atomically when peer tag names change. */ + private static volatile PeerTagSchema CURRENT = new PeerTagSchema(new String[0]); + + /** + * Identity cache of the most recently observed {@code features.peerTags()} {@link Set} instance. + * The producer hot path checks this first and skips the {@code names}-vs-set comparison when the + * caller's set instance hasn't changed. In production this is the common case -- + * {@code DDAgentFeaturesDiscovery} returns the same Set instance until reconfiguration. + */ + private static volatile Set LAST_SYNCED_INPUT; + + final String[] names; + final TagCardinalityHandler[] handlers; + + private PeerTagSchema(String[] names) { + this.names = names; + this.handlers = new TagCardinalityHandler[names.length]; + for (int i = 0; i < names.length; i++) { + this.handlers[i] = new TagCardinalityHandler(names[i], VALUE_LIMIT_PER_TAG); + } + } + + /** + * Returns the current peer-aggregation schema, lazily refreshing it if the supplied {@code + * peerTagNames} differ from the cached set. Designed to be called from the producer hot path: the + * common case is a single volatile read and an array-length / set-contains comparison. + */ + static PeerTagSchema currentSyncedTo(Set peerTagNames) { + // Fast path: same Set instance as the last sync -> the cached schema is still valid, no + // matches() loop needed. In production this is the steady-state case. + if (peerTagNames == LAST_SYNCED_INPUT) { + return CURRENT; + } + PeerTagSchema cur = CURRENT; + if (matches(cur.names, peerTagNames)) { + LAST_SYNCED_INPUT = peerTagNames; + return cur; + } + synchronized (PeerTagSchema.class) { + cur = CURRENT; + if (!matches(cur.names, peerTagNames)) { + cur = new PeerTagSchema(peerTagNames.toArray(new String[0])); + CURRENT = cur; + } + LAST_SYNCED_INPUT = peerTagNames; + return cur; + } + } + + /** Resets the working sets of {@link #INTERNAL} and {@link #current()}. */ + static void resetAll() { + PeerTagSchema cur = CURRENT; + for (TagCardinalityHandler h : cur.handlers) { + h.reset(); + } + for (TagCardinalityHandler h : INTERNAL.handlers) { + h.reset(); + } + } + + int size() { + return names.length; + } + + String name(int i) { + return names[i]; + } + + TagCardinalityHandler handler(int i) { + return handlers[i]; + } + + private static boolean matches(String[] cur, Set set) { + if (cur.length != set.size()) { + return false; + } + for (String n : cur) { + if (!set.contains(n)) { + return false; + } + } + return true; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index b7f81712945..5967c1302c7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -21,10 +21,18 @@ final class SpanSnapshot implements InboxItem { final String spanKind; /** - * Flattened name/value pairs of peer-tag matches: {@code [name0, value0, name1, value1, ...]}. - * {@code null} when there are no matches (the common case). + * Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema + * carries the names + {@link TagCardinalityHandler}s in parallel array form; {@code + * peerTagValues} holds the per-span tag values at the same indices. */ - final String[] peerTagPairs; + final PeerTagSchema peerTagSchema; + + /** + * Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null} + * entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link + * #peerTagSchema} is {@code null}. + */ + final String[] peerTagValues; final String httpMethod; final String httpEndpoint; @@ -43,7 +51,8 @@ final class SpanSnapshot implements InboxItem { boolean synthetic, boolean traceRoot, String spanKind, - String[] peerTagPairs, + PeerTagSchema peerTagSchema, + String[] peerTagValues, String httpMethod, String httpEndpoint, String grpcStatusCode, @@ -57,7 +66,8 @@ final class SpanSnapshot implements InboxItem { this.synthetic = synthetic; this.traceRoot = traceRoot; this.spanKind = spanKind; - this.peerTagPairs = peerTagPairs; + this.peerTagSchema = peerTagSchema; + this.peerTagValues = peerTagValues; this.httpMethod = httpMethod; this.httpEndpoint = httpEndpoint; this.grpcStatusCode = grpcStatusCode; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index b8bf8fd1a3b..7a4f84c30dd 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -220,7 +220,8 @@ private static final class SnapshotBuilder { private final String service; private final String operation; private final String spanKind; - private String[] peerTagPairs; + private PeerTagSchema peerTagSchema; + private String[] peerTagValues; private long tagAndDuration = 0L; SnapshotBuilder(String service, String operation, String spanKind) { @@ -230,7 +231,23 @@ private static final class SnapshotBuilder { } SnapshotBuilder peerTags(String... namesAndValues) { - this.peerTagPairs = namesAndValues; + // Build a schema from the (name, value, name, value, ...) input. Synced through the + // production singleton so canonicalization actually goes through the same handlers the + // aggregator would use in production -- which is the surface the test wants to exercise. + java.util.LinkedHashSet names = new java.util.LinkedHashSet<>(); + for (int i = 0; i < namesAndValues.length; i += 2) { + names.add(namesAndValues[i]); + } + this.peerTagSchema = PeerTagSchema.currentSyncedTo(names); + this.peerTagValues = new String[peerTagSchema.size()]; + for (int i = 0; i < namesAndValues.length; i += 2) { + for (int j = 0; j < peerTagSchema.size(); j++) { + if (peerTagSchema.name(j).equals(namesAndValues[i])) { + peerTagValues[j] = namesAndValues[i + 1]; + break; + } + } + } return this; } @@ -245,7 +262,8 @@ SpanSnapshot build() { false, true, spanKind, - peerTagPairs, + peerTagSchema, + peerTagValues, null, null, null, From 7acb421474721c57761bfe39705e3417e8c8b097 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 17:35:16 -0400 Subject: [PATCH 4/8] Rename ConflatingMetricsAggregator to ClientStatsAggregator The "Conflating" in the name dates from the prior design that used a Batch pool + pending map to conflate up to 64 hits per inbox slot. That mechanism is gone -- the producer now publishes one SpanSnapshot per span and the consumer's AggregateTable is the conflation point. The new name matches the existing protocol/metric terminology (HealthMetrics.onClientStat*, stats.flush_payloads, etc.). File renames: ConflatingMetricsAggregator.java -> ClientStatsAggregator.java ConflatingMetricAggregatorTest.groovy -> ClientStatsAggregatorTest.groovy ConflatingMetricsAggregatorBenchmark -> ClientStatsAggregatorBenchmark ConflatingMetricsAggregatorDDSpan* -> ClientStatsAggregatorDDSpan* Plus all symbol references in MetricsAggregatorFactory and the test fixtures that referenced the old class name. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...va => ClientStatsAggregatorBenchmark.java} | 6 +- ...ClientStatsAggregatorDDSpanBenchmark.java} | 14 ++--- ...egator.java => ClientStatsAggregator.java} | 27 ++++----- .../metrics/MetricsAggregatorFactory.java | 2 +- ...roovy => ClientStatsAggregatorTest.groovy} | 60 +++++++++---------- .../common/metrics/FootprintForkedTest.groovy | 2 +- .../MetricsAggregatorFactoryTest.groovy | 2 +- 7 files changed, 56 insertions(+), 57 deletions(-) rename dd-trace-core/src/jmh/java/datadog/trace/common/metrics/{ConflatingMetricsAggregatorBenchmark.java => ClientStatsAggregatorBenchmark.java} (95%) rename dd-trace-core/src/jmh/java/datadog/trace/common/metrics/{ConflatingMetricsAggregatorDDSpanBenchmark.java => ClientStatsAggregatorDDSpanBenchmark.java} (85%) rename dd-trace-core/src/main/java/datadog/trace/common/metrics/{ConflatingMetricsAggregator.java => ClientStatsAggregator.java} (94%) rename dd-trace-core/src/test/groovy/datadog/trace/common/metrics/{ConflatingMetricAggregatorTest.groovy => ClientStatsAggregatorTest.groovy} (95%) diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java similarity index 95% rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java index b9a2f7f8c54..b9d72eaf3ab 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorBenchmark.java @@ -34,12 +34,12 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) -public class ConflatingMetricsAggregatorBenchmark { +public class ClientStatsAggregatorBenchmark { private final DDAgentFeaturesDiscovery featuresDiscovery = new FixedAgentFeaturesDiscovery( Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( + private final ClientStatsAggregator aggregator = + new ClientStatsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), featuresDiscovery, diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java similarity index 85% rename from dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java rename to dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java index 02c6aaffc1a..06052c57ded 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorDDSpanBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ClientStatsAggregatorDDSpanBenchmark.java @@ -28,8 +28,8 @@ import org.openjdk.jmh.infra.Blackhole; /** - * Parallels {@link ConflatingMetricsAggregatorBenchmark} but uses real {@link DDSpan} instances - * instead of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link + * Parallels {@link ClientStatsAggregatorBenchmark} but uses real {@link DDSpan} instances instead + * of the lightweight {@code SimpleSpan} mock, so the JIT exercises the production {@link * CoreSpan#isKind} path (cached span.kind ordinal + bit-test) rather than the groovy mock's * dispatch. */ @@ -39,21 +39,21 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) -public class ConflatingMetricsAggregatorDDSpanBenchmark { +public class ClientStatsAggregatorDDSpanBenchmark { private static final CoreTracer TRACER = CoreTracer.builder().writer(new NoopWriter()).strictTraceWrites(false).build(); private final DDAgentFeaturesDiscovery featuresDiscovery = - new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( Collections.singleton("peer.hostname"), Collections.emptySet()); - private final ConflatingMetricsAggregator aggregator = - new ConflatingMetricsAggregator( + private final ClientStatsAggregator aggregator = + new ClientStatsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), featuresDiscovery, HealthMetrics.NO_OP, - new ConflatingMetricsAggregatorBenchmark.NullSink(), + new ClientStatsAggregatorBenchmark.NullSink(), 2048, 2048, false); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java similarity index 94% rename from dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java rename to dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index 7497ed9a799..1b1aeec402a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -39,9 +39,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class ConflatingMetricsAggregator implements MetricsAggregator, EventListener { +public final class ClientStatsAggregator implements MetricsAggregator, EventListener { - private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class); + private static final Logger log = LoggerFactory.getLogger(ClientStatsAggregator.class); private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); @@ -75,7 +75,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private volatile AgentTaskScheduler.Scheduled cancellation; - public ConflatingMetricsAggregator( + public ClientStatsAggregator( Config config, SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetrics) { @@ -96,7 +96,7 @@ public ConflatingMetricsAggregator( config.isTraceResourceRenamingEnabled()); } - ConflatingMetricsAggregator( + ClientStatsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, @@ -118,7 +118,7 @@ public ConflatingMetricsAggregator( includeEndpointInMetrics); } - ConflatingMetricsAggregator( + ClientStatsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, @@ -142,7 +142,7 @@ public ConflatingMetricsAggregator( includeEndpointInMetrics); } - ConflatingMetricsAggregator( + ClientStatsAggregator( Set ignoredResources, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, @@ -327,9 +327,9 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { } /** - * Picks the peer-tag schema for a span. For peer-aggregation kinds, syncs the schema with - * {@code features.peerTags()} so producer and consumer share the same name/handler ordering. - * For internal-kind spans returns the static {@link PeerTagSchema#INTERNAL} schema. + * Picks the peer-tag schema for a span. For peer-aggregation kinds, syncs the schema with {@code + * features.peerTags()} so producer and consumer share the same name/handler ordering. For + * internal-kind spans returns the static {@link PeerTagSchema#INTERNAL} schema. */ private PeerTagSchema peerTagSchemaFor(CoreSpan span) { if (span.isKind(PEER_AGGREGATION_KINDS)) { @@ -411,17 +411,16 @@ private void disable() { if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); // Route the clear through the inbox so the aggregator thread is the only writer. - // AggregateTable is not thread-safe; calling clearAggregates() directly from this thread - // would race with Drainer.accept on the aggregator thread. + // AggregateTable is not thread-safe; clearing it directly from this thread would race + // with Drainer.accept on the aggregator thread. inbox.offer(CLEAR); } } - private static final class ReportTask - implements AgentTaskScheduler.Task { + private static final class ReportTask implements AgentTaskScheduler.Task { @Override - public void run(ConflatingMetricsAggregator target) { + public void run(ClientStatsAggregator target) { target.report(); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java index 09464310113..b9530871763 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java @@ -15,7 +15,7 @@ public static MetricsAggregator createMetricsAggregator( HealthMetrics healthMetrics) { if (config.isTracerMetricsEnabled()) { log.debug("tracer metrics enabled"); - return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics); + return new ClientStatsAggregator(config, sharedCommunicationObjects, healthMetrics); } log.debug("tracer metrics disabled"); return NoOpMetricsAggregator.INSTANCE; diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy similarity index 95% rename from dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy rename to dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy index 4dd0155443a..1fbdd63dff3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy @@ -18,7 +18,7 @@ import java.util.concurrent.TimeoutException import java.util.function.Supplier import spock.lang.Shared -class ConflatingMetricAggregatorTest extends DDSpecification { +class ClientStatsAggregatorTest extends DDSpecification { static Set empty = new HashSet<>() @@ -35,7 +35,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( wellKnownTags, empty, features, @@ -65,7 +65,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( wellKnownTags, [ignoredResourceName].toSet(), features, @@ -103,7 +103,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -149,7 +149,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -195,7 +195,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -260,7 +260,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -327,7 +327,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -380,7 +380,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -432,7 +432,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) long duration = 100 List trace = [ @@ -504,7 +504,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -631,7 +631,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -746,7 +746,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -816,7 +816,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -888,7 +888,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -956,7 +956,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -990,7 +990,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1035,7 +1035,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1137,7 +1137,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1197,7 +1197,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1248,7 +1248,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1279,7 +1279,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() @@ -1301,7 +1301,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> false features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) @@ -1333,7 +1333,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) when: @@ -1366,7 +1366,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1413,7 +1413,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1468,7 +1468,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -1559,7 +1559,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1632,14 +1632,14 @@ class ConflatingMetricAggregatorTest extends DDSpecification { aggregator.close() } - def reportAndWaitUntilEmpty(ConflatingMetricsAggregator aggregator) { + def reportAndWaitUntilEmpty(ClientStatsAggregator aggregator) { waitUntilEmpty(aggregator) aggregator.report() waitUntilEmpty(aggregator) } - def waitUntilEmpty(ConflatingMetricsAggregator aggregator) { + def waitUntilEmpty(ClientStatsAggregator aggregator) { int i = 0 while (!aggregator.inbox.isEmpty() && i++ < 100) { Thread.sleep(10) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index eceedeb1935..86a91c23b3f 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -37,7 +37,7 @@ class FootprintForkedTest extends DDSpecification { it.supportsMetrics() >> true it.peerTags() >> [] } - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( + ClientStatsAggregator aggregator = new ClientStatsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, features, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy index 07f246bf9a9..dc9eb86fde3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy @@ -28,6 +28,6 @@ class MetricsAggregatorFactoryTest extends DDSpecification { expect: def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP, ) - assert aggregator instanceof ConflatingMetricsAggregator + assert aggregator instanceof ClientStatsAggregator } } From a733cbf2e0c28b33d818a3981199744e3451ec56 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 17:35:34 -0400 Subject: [PATCH 5/8] Cleanups: fix previousCounts size, drop dead code Three small follow-ups carried over from a /techdebt pass: - TracerHealthMetrics: previousCounts array was sized 51, but the prior commits added a 52nd reporter (statsInboxFull). Without this fix the new counter's report() call would throw ArrayIndexOutOfBoundsException; the Flush task swallows that exception, so the failure would be silent (statsInboxFull would just never make it to statsd). - Aggregator: removes the now-dead public clearAggregates() method. The ClearSignal route from ClientStatsAggregator.disable() supplanted it several commits ago; the method had no remaining callers. - TagCardinalityHandler: removes the unused register(TagMap.Entry) overload and its isValidType helper. The String-keyed overload covers all current callers (AggregateEntry's peer-tag canonicalization). - PeerTagSchema: spotless-driven javadoc reflow only. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../trace/common/metrics/Aggregator.java | 4 --- .../trace/common/metrics/PeerTagSchema.java | 4 +-- .../common/metrics/TagCardinalityHandler.java | 32 +------------------ .../core/monitor/TracerHealthMetrics.java | 2 +- 4 files changed, 4 insertions(+), 38 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 9bcd41f37e4..8fe25288acd 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -66,10 +66,6 @@ final class Aggregator implements Runnable { this.healthMetrics = healthMetrics; } - public void clearAggregates() { - this.aggregates.clear(); - } - @Override public void run() { Thread currentThread = Thread.currentThread(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java index f41b2634da6..4efaec4a0a2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -42,8 +42,8 @@ final class PeerTagSchema { /** * Identity cache of the most recently observed {@code features.peerTags()} {@link Set} instance. * The producer hot path checks this first and skips the {@code names}-vs-set comparison when the - * caller's set instance hasn't changed. In production this is the common case -- - * {@code DDAgentFeaturesDiscovery} returns the same Set instance until reconfiguration. + * caller's set instance hasn't changed. In production this is the common case -- {@code + * DDAgentFeaturesDiscovery} returns the same Set instance until reconfiguration. */ private static volatile Set LAST_SYNCED_INPUT; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java index eeac6caf817..1fdfed5c7c4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TagCardinalityHandler.java @@ -1,6 +1,5 @@ package datadog.trace.common.metrics; -import datadog.trace.api.TagMap; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import java.util.HashMap; @@ -8,7 +7,7 @@ public final class TagCardinalityHandler { private final String tag; private final int cardinalityLimit; - private final HashMap curUtf8Pairs; + private final HashMap curUtf8Pairs; private UTF8BytesString cacheBlocked = null; @@ -20,31 +19,6 @@ public TagCardinalityHandler(String tag, int cardinalityLimit) { this.curUtf8Pairs = new HashMap<>((int) Math.ceil(cardinalityLimit / 0.75) + 1); } - public UTF8BytesString register(TagMap.Entry entry) { - if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { - return this.blockedByTracer(); - } - - if (!isValidType(entry)) { - return this.blockedByTracer(); - } - - // NOTE: This could lead to boxing -- not ideal - Object cacheKey = entry.objectValue(); - UTF8BytesString existing = this.curUtf8Pairs.get(cacheKey); - if (existing != null) return existing; - - // TODO: maybe use a fallback cache to reduce allocations across reset cycles - UTF8BytesString newPair = UTF8BytesString.create(this.tag + ":" + entry.stringValue()); - this.curUtf8Pairs.put(cacheKey, newPair); - return newPair; - } - - /** - * String-keyed overload for callers that already hold a {@code (tag, value)} pair as Strings and - * would rather not allocate a {@link TagMap.Entry} per lookup -- e.g. the metrics aggregator's - * peer-tag flow, where peer-tag values are flattened into a {@code String[]} on the snapshot. - */ public UTF8BytesString register(String value) { if (this.curUtf8Pairs.size() >= this.cardinalityLimit) { return this.blockedByTracer(); @@ -58,10 +32,6 @@ public UTF8BytesString register(String value) { return newPair; } - private static final boolean isValidType(TagMap.Entry entry) { - return entry.isNumericPrimitive() || entry.objectValue() instanceof CharSequence; - } - private UTF8BytesString blockedByTracer() { UTF8BytesString cacheBlocked = this.cacheBlocked; if (cacheBlocked != null) return cacheBlocked; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 76051645fcb..db384a7e42e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -382,7 +382,7 @@ private static class Flush implements AgentTaskScheduler.Task Date: Fri, 15 May 2026 17:49:48 -0400 Subject: [PATCH 6/8] Hoist peer-tag schema sync to once per trace ClientStatsAggregator.publish was calling features.peerTags() + PeerTagSchema.currentSyncedTo for every span. Peer-tag configuration is stable for the duration of a single trace publish in production -- DDAgentFeaturesDiscovery returns the same Set instance until remote-config reconfiguration -- so the per-snapshot sync is wasted work. Move the sync to once per publish(trace) and pass the resolved schema to the inner publish(span, isTopLevel, peerAggSchema). INTERNAL-kind spans still use the static PeerTagSchema.INTERNAL regardless. Behavior boundary ----------------- Schema changes from features.peerTags() now take effect at the next publish(trace) call rather than mid-trace. Production-equivalent (a trace takes microseconds to milliseconds; remote-config refreshes are seconds apart), but a Spock test that used `>>> [...]` to mock different peerTags() returns on successive calls within one trace no longer makes sense in the new model. That test is rewritten to assert the production-relevant case: peer-tag NAMES are stable, peer-tag VALUES vary per span, distinct value combinations produce distinct aggregate buckets. Benchmark (2 forks x 5 iter x 15s) ---------------------------------- SimpleSpan bench: 3.133 +- 0.057 us/op (prior: 3.165 +- 0.032) DDSpan bench: 2.454 +- 0.082 us/op (prior: 2.727 +- 0.018) Recovers ~270 ns/op on the DDSpan bench -- most of the regression introduced by the per-snapshot lookup. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../common/metrics/ClientStatsAggregator.java | 31 +++++++++++-------- .../metrics/ClientStatsAggregatorTest.groovy | 13 +++++--- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index 1b1aeec402a..c199dd2b403 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -243,6 +243,14 @@ public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; if (features.supportsMetrics()) { + // Sync the peer-aggregation schema once per trace; peer-tag configuration is stable for + // the duration of a single trace publish in production (DDAgentFeaturesDiscovery returns + // the same Set instance until remote-config reconfiguration). + Set eligiblePeerTags = features.peerTags(); + PeerTagSchema peerAggSchema = + (eligiblePeerTags == null || eligiblePeerTags.isEmpty()) + ? null + : PeerTagSchema.currentSyncedTo(eligiblePeerTags); for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span, isTopLevel)) { @@ -253,7 +261,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel); + forceKeep |= publish(span, isTopLevel, peerAggSchema); } } healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); @@ -268,7 +276,7 @@ private boolean shouldComputeMetric(CoreSpan span, boolean isTopLevel) { && span.getDurationNano() > 0; } - private boolean publish(CoreSpan span, boolean isTopLevel) { + private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peerAggSchema) { // Extract HTTP method and endpoint only if the feature is enabled String httpMethod = null; String httpEndpoint = null; @@ -293,7 +301,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { long tagAndDuration = span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - PeerTagSchema peerTagSchema = peerTagSchemaFor(span); + PeerTagSchema peerTagSchema = peerTagSchemaFor(span, peerAggSchema); String[] peerTagValues = peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema); if (peerTagValues == null) { @@ -327,17 +335,14 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { } /** - * Picks the peer-tag schema for a span. For peer-aggregation kinds, syncs the schema with {@code - * features.peerTags()} so producer and consumer share the same name/handler ordering. For - * internal-kind spans returns the static {@link PeerTagSchema#INTERNAL} schema. + * Picks the peer-tag schema for a span. The {@code peerAggSchema} argument is the per-trace + * cached schema (synced from {@code features.peerTags()} once in {@link #publish(List)}); it's + * {@code null} when no peer tags are configured. For internal-kind spans the static {@link + * PeerTagSchema#INTERNAL} schema is used regardless. */ - private PeerTagSchema peerTagSchemaFor(CoreSpan span) { - if (span.isKind(PEER_AGGREGATION_KINDS)) { - Set eligible = features.peerTags(); - if (eligible == null || eligible.isEmpty()) { - return null; - } - return PeerTagSchema.currentSyncedTo(eligible); + private static PeerTagSchema peerTagSchemaFor(CoreSpan span, PeerTagSchema peerAggSchema) { + if (peerAggSchema != null && span.isKind(PEER_AGGREGATION_KINDS)) { + return peerAggSchema; } if (span.isKind(INTERNAL_KIND)) { return PeerTagSchema.INTERNAL; diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy index 1fbdd63dff3..3cccc50c5a4 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ClientStatsAggregatorTest.groovy @@ -253,13 +253,16 @@ class ClientStatsAggregatorTest extends DDSpecification { "client" | "GET" | "/external/api" | true } - def "should create bucket for each set of peer tags"() { + def "should create separate buckets for distinct peer tag values"() { + // Peer-tag NAMES are configured per-tracer and stable for the duration of a trace publish; + // peer-tag VALUES vary per-span. Two spans with the same names but different values should + // produce two distinct aggregate buckets. setup: MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.peerTags() >>> [["country"], ["country", "georegion"],] + features.peerTags() >> ["country", "georegion"] ClientStatsAggregator aggregator = new ClientStatsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -270,7 +273,7 @@ class ClientStatsAggregatorTest extends DDSpecification { new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"), new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) - .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") + .setTag(SPAN_KIND, "client").setTag("country", "germany").setTag("georegion", "europe") ]) aggregator.report() def latchTriggered = latch.await(2, SECONDS) @@ -289,7 +292,7 @@ class ClientStatsAggregatorTest extends DDSpecification { false, false, "client", - [UTF8BytesString.create("country:france")], + [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], null, null, null @@ -307,7 +310,7 @@ class ClientStatsAggregatorTest extends DDSpecification { false, false, "client", - [UTF8BytesString.create("country:france"), UTF8BytesString.create("georegion:europe")], + [UTF8BytesString.create("country:germany"), UTF8BytesString.create("georegion:europe")], null, null, null From 189fca929bc1bb1e9ebb4cca31537810d6c8d2d8 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 18:14:15 -0400 Subject: [PATCH 7/8] Use cached span.kind ordinal in metrics producer; drop tag-map lookup JFR profiling showed ~21% of producer CPU time spent in tag-map lookups during ClientStatsAggregator.publish. One of those lookups -- span.kind -- is redundant because DDSpanContext already caches the kind as a byte ordinal that resolves to a String via a small array. - Add CoreSpan.getSpanKindString() with a default that falls back to the tag map for non-DDSpan impls; DDSpan overrides to delegate to the context's cached resolution. - Hoist schema.names array out of the capturePeerTagValues loop. - Avoid an unnecessary toString() in isSynthetic by declaring SYNTHETICS_ORIGIN as String and using contentEquals. Benchmark (ClientStatsAggregatorDDSpanBenchmark): before: 2.410 us/op after: 1.995 us/op (~17% improvement) vs. master baseline (6.428 us/op): now ~3.2x faster. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../common/metrics/ClientStatsAggregator.java | 20 +++++++++++-------- .../java/datadog/trace/core/CoreSpan.java | 10 ++++++++++ .../main/java/datadog/trace/core/DDSpan.java | 5 +++++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index c199dd2b403..d08ce611100 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -4,7 +4,6 @@ import static datadog.trace.api.DDSpanTypes.RPC; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ClearSignal.CLEAR; @@ -46,7 +45,7 @@ public final class ClientStatsAggregator implements MetricsAggregator, EventList private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; + private static final String SYNTHETICS_ORIGIN = "synthetics"; private static final SpanKindFilter METRICS_ELIGIBLE_KINDS = SpanKindFilter.builder() @@ -293,9 +292,12 @@ private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peer Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } - // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString - // tag values don't trigger a ClassCastException on the String assignment. - final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); + // DDSpan resolves this from a cached span.kind ordinal via a small lookup array, skipping a + // tag-map lookup. Other CoreSpan impls fall back to the tag map by default. + String spanKind = span.getSpanKindString(); + if (spanKind == null) { + spanKind = ""; + } boolean error = span.getError() > 0; long tagAndDuration = @@ -355,10 +357,11 @@ private static PeerTagSchema peerTagSchemaFor(CoreSpan span, PeerTagSchema pe * Returns {@code null} when none of the configured peer tags are set on the span. */ private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema schema) { - int n = schema.size(); + String[] names = schema.names; + int n = names.length; String[] values = null; for (int i = 0; i < n; i++) { - Object v = span.unsafeGetTag(schema.name(i)); + Object v = span.unsafeGetTag(names[i]); if (v != null) { if (values == null) { values = new String[n]; @@ -370,7 +373,8 @@ private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema sch } private static boolean isSynthetic(CoreSpan span) { - return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); + CharSequence origin = span.getOrigin(); + return origin != null && SYNTHETICS_ORIGIN.contentEquals(origin); } public void stop() { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java index 7d183670883..810b13884de 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java @@ -82,6 +82,16 @@ default U unsafeGetTag(CharSequence name) { boolean isKind(SpanKindFilter filter); + /** + * Returns the {@code span.kind} tag value as a String, or {@code null} if not set. Default + * implementation reads the tag map; {@link DDSpan} overrides to use a cached ordinal that + * resolves via a small lookup array, skipping the tag-map lookup on the hot path. + */ + default String getSpanKindString() { + Object v = unsafeGetTag(datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND); + return v == null ? null : v.toString(); + } + CharSequence getType(); /** diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 4c438e1c915..943776e7577 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -963,6 +963,11 @@ public boolean isKind(SpanKindFilter filter) { return filter.matches(context.getSpanKindOrdinal()); } + @Override + public String getSpanKindString() { + return context.getSpanKindString(); + } + @Override public void copyPropagationAndBaggage(final AgentSpan source) { if (source instanceof DDSpan) { From 8020ec4df1a53841b9d9e88cc96a33e583050f97 Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Fri, 15 May 2026 18:53:46 -0400 Subject: [PATCH 8/8] Add client metrics pipeline design doc Captures the producer/consumer split, the canonical-key trick that makes cardinality-blocking actually save space, the once-per-trace peer-tag schema sync, the role of each file in datadog.trace.common.metrics, and the rationale behind the redesign from ConflatingMetricsAggregator. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/client_metrics_design.md | 308 ++++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 docs/client_metrics_design.md diff --git a/docs/client_metrics_design.md b/docs/client_metrics_design.md new file mode 100644 index 00000000000..489763fd413 --- /dev/null +++ b/docs/client_metrics_design.md @@ -0,0 +1,308 @@ +# Client-side metrics (stats aggregator) design + +This document describes the design of the **client-side metrics pipeline** that +lives under `dd-trace-core/.../common/metrics/`. The pipeline aggregates per-span +duration / count / error statistics on the tracer and sends rolled-up "client +stats" payloads to the Datadog Agent on a fixed reporting interval, so the agent +does not have to sample every span to know request rates and latencies. + +Code lives in package `datadog.trace.common.metrics`. + +## High-level shape + +``` + producer thread(s) aggregator thread + inbox + trace ─▶ ClientStatsAggregator.publish(trace) ──MPSC──▶ Aggregator.run + │ │ + │ per metrics-eligible span │ Drainer.accept + │ │ + │ allocates one SpanSnapshot ▼ + │ (immutable, ~15 refs) AggregateTable.findOrInsert + │ │ + │ inbox.offer(snapshot) │ canonicalize → hash + └────────────────────────────────────▶ │ → lookup or insert + │ + scheduled REPORT signal ──▶│ + │ Aggregator.report + │ → MetricWriter.add(entry) + │ → OkHttpSink (HTTP POST) + │ → reset cardinality handlers +``` + +Three rules govern the design: + +1. **The producer never touches shared state.** The hot path on the application + thread builds an immutable `SpanSnapshot` and offers it to a bounded MPSC + queue. No locks, no maps, no hashing of the metric key. +2. **The aggregator thread is the sole writer of every shared structure.** The + aggregate table, the cardinality handlers, the metric writer state — all of + them are accessed only from that thread. Control operations (clear, report, + stop) are themselves enqueued as `SignalItem`s so they serialize with data. +3. **Cardinality is bounded.** Per-field handlers cap the unique values; once a + field's budget is exhausted, overflow values collapse into a single + `blocked_by_tracer` sentinel so the aggregate table can't blow up. + +## Component map + +| Component | File | Role | +|---|---|---| +| `ClientStatsAggregator` | `ClientStatsAggregator.java` | Producer facade. Decides which spans are eligible, builds `SpanSnapshot`s, offers them to the inbox. Also owns the agent-feature check, the scheduled report timer, and the agent-downgrade handler. | +| `SpanSnapshot` | `SpanSnapshot.java` | Immutable, allocation-pooled-by-GC value posted from producer → aggregator. Carries raw label fields plus a duration word with `TOP_LEVEL` / `ERROR` bits OR-ed in. | +| `PeerTagSchema` | `PeerTagSchema.java` | Parallel `String[] names` + `TagCardinalityHandler[] handlers` describing the peer-aggregation tags in effect. One singleton for internal-kind spans; one volatile "current" schema for client/producer/consumer spans, refreshed from `DDAgentFeaturesDiscovery.peerTags()`. | +| `Aggregator` | `Aggregator.java` | Consumer thread `Runnable`. Drains the inbox; dispatches `SpanSnapshot`s into `AggregateTable`; processes signals (`REPORT`, `CLEAR`, `STOP`); calls the writer on report. | +| `AggregateTable` | `AggregateTable.java` | Hashtable-backed store keyed on the canonicalized labels. Owns a single reusable `Canonical` scratch buffer. Handles cap-overflow by evicting one stale entry or rejecting new ones. | +| `AggregateEntry` | `AggregateEntry.java` | `Hashtable.Entry` holding the 13 UTF8 label fields + the mutable `AggregateMetric`. Owns the static `PropertyCardinalityHandler`s for the fixed label fields, and `Canonical` for hot-path canonicalization. | +| `AggregateMetric` | `AggregateMetric.java` | Per-bucket accumulator: hit count, error count, top-level count, duration sum, ok/error latency histograms. Single-threaded; cleared each report. | +| `PropertyCardinalityHandler` | `PropertyCardinalityHandler.java` | Per-field UTF8 interner with a max-unique-values cap. Returns a `blocked_by_tracer` sentinel `UTF8BytesString` once the cap is hit. Reset by the aggregator each cycle. | +| `TagCardinalityHandler` | `TagCardinalityHandler.java` | Same pattern as the property handler, but the cached UTF8 form is the full `tag:value` pair (peer tags are wire-encoded as `tag:value`, not just the value). | +| `SerializingMetricWriter` / `OkHttpSink` | `SerializingMetricWriter.java`, `OkHttpSink.java` | Wire serialization (MessagePack) + HTTP POST to the agent's `/v0.6/stats` endpoint. | +| `MetricsAggregatorFactory` / `NoOpMetricsAggregator` | factory + no-op | Picks the real implementation when client stats are enabled and the agent supports the endpoint, no-op otherwise. | + +## Producer-side flow (`ClientStatsAggregator.publish`) + +The producer holds **no shared state**. Per trace it: + +1. Snapshots the current peer-aggregation schema **once per trace** (not per + span): + ```java + Set eligiblePeerTags = features.peerTags(); + PeerTagSchema peerAggSchema = + (eligiblePeerTags == null || eligiblePeerTags.isEmpty()) + ? null + : PeerTagSchema.currentSyncedTo(eligiblePeerTags); + ``` + `currentSyncedTo` has a fast path: identity-equal to the previously-synced + `Set` instance → return the cached schema (the common case, since + `DDAgentFeaturesDiscovery` returns the same `Set` until remote-config + reconfiguration). The cached schema is `volatile`; replacement is guarded by + a `synchronized` block. + +2. Iterates the trace; for each metrics-eligible span: + + - **Eligibility** (`shouldComputeMetric`): + ```java + (measured || isTopLevel || isKind(SERVER|CLIENT|PRODUCER|CONSUMER)) + && longRunningVersion <= 0 + && durationNano > 0 + ``` + `isMeasured` / `isTopLevel` are flag reads on `DDSpanContext`; `isKind` + reads the **cached `byte` span-kind ordinal** through a `SpanKindFilter` + bitmask test — no tag-map lookup. + + - **Resource-name ignore-list** breaks out of the trace early; the entire + trace is dropped on a match. + + - **Picks the peer-tag schema** (`peerTagSchemaFor`): for client/producer/ + consumer kinds → `peerAggSchema` (already synced for this trace); for + internal-kind spans → `PeerTagSchema.INTERNAL` (single `base.service` + entry); otherwise `null`. + + - **Captures peer-tag *values***, not pairs: walks `schema.names` and pulls + `unsafeGetTag(name)` for each, into a parallel `String[]`. Names + handlers + are the schema's job; the producer only carries raw values. Returns `null` + when no peer tags are set, in which case the schema reference is dropped + too so the consumer doesn't loop over an all-null array. + + - **Builds and offers** a `SpanSnapshot` to the MPSC inbox. The span-kind + string is taken from `CoreSpan.getSpanKindString()`, which DDSpan + overrides to resolve via the cached byte ordinal through a small lookup + array — **no tag-map lookup**. Origin equality uses `contentEquals`. + `httpMethod` / `httpEndpoint` are only fetched when + `traceClientStatsEndpoints=true`; `grpcStatusCode` only when span type is + `rpc`. + + - On inbox-full: the snapshot is dropped and `healthMetrics.onStatsInboxFull()` + fires. The producer never blocks. + +3. Reports `healthMetrics.onClientStatTraceComputed(counted, total, dropped)`. + + `forceKeep` is the only signal returned upward — `true` if any of the + trace's metrics-eligible spans had errors, so the trace writer keeps the + raw trace too. + +### Why the producer is lean + +The cumulative cost of running these checks on every finished span is the +single biggest concern. The producer deliberately avoids: + +- locking or synchronization of any kind on the hot path, +- hashing the metric key (deferred to the aggregator thread), +- map / cache lookups for label canonicalization (deferred), +- tag-map lookups when a span carries the relevant information on the context + itself (`span.kind` via the cached byte ordinal; `isMeasured`, `isTopLevel` + via flag reads), +- allocation beyond the `SpanSnapshot` itself and a single `String[]` for peer + tag values when any are present. + +## Aggregator-side flow (`Aggregator.run`) + +A single agent thread runs the `Aggregator.run` loop. The thread drains the +inbox via `inbox.drain(drainer)`; when the queue is empty it sleeps +`DEFAULT_SLEEP_MILLIS` (10 ms) and retries. The Drainer dispatches by item +type: + +- `SpanSnapshot` → `AggregateTable.findOrInsert(snapshot)` returns either an + existing or freshly-inserted `AggregateMetric`, then the snapshot's + `tagAndDuration` is recorded. If the table is at capacity and no stale entry + can be evicted, `healthMetrics.onStatsAggregateDropped()` fires. + +- `ReportSignal` → on the scheduled cadence (the default report interval is + 10 s; configurable via `tracerMetricsMaxAggregates` / reporting interval), + `Aggregator.report`: + 1. Expunges entries with `hitCount == 0` (stale). + 2. If anything remains, opens a bucket via `MetricWriter.startBucket(...)`, + walks `AggregateTable.forEach`, writes each entry, clears its metric. + 3. Calls `MetricWriter.finishBucket()` (which may do I/O and block). + 4. **Resets all cardinality handlers** so the next interval starts with a + fresh budget. Existing entries keep their previously-issued UTF8 + references, and matching is by content-equality, so canonicalizing a + post-reset snapshot against an existing entry still resolves to the + same bucket. + +- `ClearSignal` → drops the aggregate state. The downgrade handler + (`onEvent(DOWNGRADED, ...)`) offers `CLEAR` to the inbox rather than calling + `clearAggregates()` directly, so the aggregator thread remains the sole + writer of the table. + +- `StopSignal` → final report + thread exit. + +## The canonical-key trick (cardinality-safe deduplication) + +The lookup hash is computed from the **canonicalized** label fields, not the +raw `SpanSnapshot` fields. This is the property that makes +cardinality-blocking actually save space: + +```java +// AggregateTable.findOrInsert +canonical.populate(snapshot); // runs every field through its handler +long keyHash = canonical.keyHash; +int bucketIndex = Hashtable.Support.bucketIndex(buckets, keyHash); +for (Hashtable.Entry e = buckets[bucketIndex]; e != null; e = e.next()) { + if (e.keyHash == keyHash) { + AggregateEntry candidate = (AggregateEntry) e; + if (canonical.matches(candidate)) { + return candidate.aggregate; + } + } +} +// miss → toEntry, splice into bucket head +``` + +`Canonical.populate` runs each label field through its +`PropertyCardinalityHandler` (or `TagCardinalityHandler` for peer tags). Once a +handler's working set is full, **every subsequent unique value resolves to the +same `UTF8BytesString` sentinel** — so the hash computed from the canonical +form is identical for all blocked values. They land in the same bucket and +merge into one `AggregateEntry` rather than fragmenting into N entries. + +The `Canonical` scratch buffer is reused per `findOrInsert` call. On a hit, +nothing is allocated. On a miss, `toEntry` snapshots the buffer's references +into a fresh entry; the buffer is overwritten on the next call. + +### Hash chain (no varargs) + +`AggregateEntry.hashOf` uses chained primitive calls into +`LongHashingUtils.addToHash(long, T)` rather than a varargs `addToHash(long, +Object...)`. This avoids the `Object[]` allocation and boxing of the primitive +fields (`httpStatusCode`, `synthetic`, `traceRoot`) that varargs would force. + +## Reporting cadence and cardinality reset + +Two distinct cadences: + +- **Reporting interval** (default 10 s): when the report timer fires, + `ReportTask` calls `report()` which `inbox.offer(REPORT)`. The aggregator + drains up to that signal, then writes the bucket and resets the cardinality + handlers. The handlers reset *every reporting cycle*, so the per-field + budgets refresh. + +- **Schema sync**: `PeerTagSchema.currentSyncedTo` runs on the producer thread + per trace, with an identity-check fast path. The schema reference is + replaced atomically when remote-config reconfigures the peer-tag set. + +## Memory and lifetime + +- `AggregateMetric` is **not thread-safe**. It is mutated only by the + aggregator thread. +- `AggregateTable` is **not thread-safe**. All paths (producer-side `CLEAR`, + schedule-driven `REPORT`, drainer-driven inserts) route through the inbox. +- `Canonical` and the cardinality handlers are aggregator-thread-only. +- `PeerTagSchema.CURRENT` is `volatile` with `synchronized` replacement; the + schema's `TagCardinalityHandler`s themselves are aggregator-thread-only and + are reset alongside the property handlers each cycle. +- Entries retain their `UTF8BytesString` references across handler resets; + matches via content-equality so post-reset snapshots still resolve. +- Cap: `tracerMetricsMaxAggregates` bounds table size. Cap-overrun policy: + evict one stale entry (`hitCount == 0`) or drop the new data point. + +## Health metrics + +The producer reports per-trace stats via `HealthMetrics`: + +- `onClientStatTraceComputed(counted, totalSpans, dropped)` — per `publish`. +- `onStatsInboxFull()` — when the MPSC queue rejects an offer. +- `onClientStatPayloadSent()` / `onClientStatDowngraded()` / + `onClientStatErrorReceived()` — on agent-side outcomes. +- `onStatsAggregateDropped()` — when the aggregator thread can't fit a new + entry. + +## Failure modes + +| Failure | Effect | +|---|---| +| Inbox full | Snapshot dropped, `onStatsInboxFull` increments, producer continues. | +| Agent unavailable / errors | `OkHttpSink` reports `BAD_PAYLOAD` / `ERROR`; metric reporting continues. | +| Agent downgrade (no /v0.6/stats) | `disable()` offers `CLEAR` to the inbox; the aggregator wipes its table. Producer's `features.supportsMetrics()` returns false on subsequent calls, so new snapshots are not built. | +| Aggregate table full, no stale entry | New snapshot dropped, `onStatsAggregateDropped` increments. Existing entries continue to accumulate. | +| Cardinality budget exhausted | Overflow values canonicalize to a `blocked_by_tracer` sentinel and merge into one bucket. Total entry count stays bounded by `maxAggregates`. | +| Producer throws mid-trace | Caught by the writer's normal error path; `onClientStatTraceComputed` is not called for that trace. | + +## Why the redesign (history) + +The pipeline was previously `ConflatingMetricsAggregator` with: + +- producer-side `MetricKey` construction (string-canonicalization on the hot + path), +- a `LRUCache` of `MetricKey → AggregateMetric`, +- per-tag `DDCache` instances for canonicalization (one per label field), +- early computation of `tag:value` peer pairs on the producer thread. + +The current `ClientStatsAggregator` shape was motivated by JMH benchmarks that +showed the producer dominating CPU time. The major shifts: + +1. **Move all canonicalization off the producer.** Producer just shuffles + references into a `SpanSnapshot`. +2. **Replace `MetricKey` with inlined fields on `AggregateEntry`.** Removes a + per-snapshot allocation; lets us own the hash code on the entry itself. +3. **Replace the `LRUCache` with a `Hashtable`** keyed on canonicalized labels. + Hash is computed once per insert/lookup; chained primitive hashing avoids + boxing. +4. **Replace per-tag `DDCache`s with per-field `PropertyCardinalityHandler`s** + that share a `blocked_by_tracer` sentinel for cardinality overflow. Reset + each reporting cycle. +5. **Capture peer-tag values, not pairs.** Tag-name + handler live on + `PeerTagSchema`; the producer carries values in a parallel `String[]`. The + aggregator does the `tag:value` interning via `TagCardinalityHandler` on + its own thread. +6. **Sync peer-tag schema once per trace.** `currentSyncedTo` has an + identity-check fast path; the steady-state cost is one volatile read. +7. **Single owner of all shared state.** `disable()` routes through `CLEAR` + rather than mutating the aggregate table directly. + +### Benchmark summary + +`ClientStatsAggregatorDDSpanBenchmark` (64 client-kind DDSpans per op, single +trace, real `CoreTracer` with a no-op writer): + +| Variant | µs/op | +|---|---| +| master (`ConflatingMetricsAggregator`, baseline) | 6.428 | +| with `SpanSnapshot` + background aggregation | 2.454 | +| with peer-tag schema hoist | 2.410 | +| with cached span-kind ordinal + isSynthetic fix | 1.995 | + +The remaining producer-thread hotspots (from JFR sampling) are tag-map +lookups for `peer.hostname` / other peer-tag values inside +`capturePeerTagValues`. A bulk peer-tag accessor on `DDSpan` would crack that +chunk further, but is a structural change beyond the current package.