Skip to content

Commit

Permalink
Introduce trace.datadog.tags.enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
ygree committed Jan 11, 2022
1 parent 1ba45be commit d2ba595
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public final class TracerConfig {
public static final String SAMPLING_MECHANISM_VALIDATION_DISABLED =
"trace.sampling.mechanism.validation.disabled";

/** Enables x-datadog-tags propagation and upstream_services tracking. Enabled by default. */
public static final String DATADOG_TAGS_ENABLED = "trace.datadog.tags.enabled";

/**
* Limit for x-datadog-tags. When exceeded it will stop propagating x-datadog-tags and log a
* warning. 512 by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,13 +543,10 @@ public Map<String, Object> getTags() {
}

public void processTagsAndBaggage(final MetadataConsumer consumer) {
Map<String, String> ddTagsMap = ddTags.parseTags();
if (ddTagsMap == null) {
log.warn("Malformed Datadog tags `{}` won't be sent to the backend!", ddTags);
}
Map<String, String> ddTagsMap = ddTags.parseAndMerge();
synchronized (unsafeTags) {
Map<String, String> ddTagsAndBaggageItems;
if (ddTagsMap != null) {
if (ddTagsMap != null && !ddTagsMap.isEmpty()) {
// merge datadog tags and baggage items
ddTagsAndBaggageItems = ddTagsMap;
ddTagsAndBaggageItems.putAll(baggageItems);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public ContextInterpreter reset() {
tags = Collections.emptyMap();
baggage = Collections.emptyMap();
valid = true;
ddTags = null;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public <C> void inject(

DatadogTags datadogTags = context.getDatadogTags();
if (!datadogTags.isEmpty()) {
String encodedTags = datadogTags.encode();
String encodedTags = datadogTags.encodeAsHeaderValue();
int limit = context.getDatadogTagsLimit();
if (encodedTags.length() > limit) {
log.warn(
"{} exceeded limit of {} characters and will be dropped. Consider increasing the {} limit",
TAGS_KEY,
limit,
TracerConfig.DATADOG_TAGS_LIMIT);
// let backend know
// let the backend know about exceeding the limit
setter.set(carrier, TAGS_KEY, "_dd.propagation_error:max_size");
} else {
setter.set(carrier, TAGS_KEY, encodedTags);
Expand All @@ -78,13 +78,14 @@ public <C> void inject(
}
}

public static HttpCodec.Extractor newExtractor(final Map<String, String> tagMapping) {
public static HttpCodec.Extractor newExtractor(
final Map<String, String> tagMapping, final boolean isDatadogTagPropagationEnabled) {
return new TagContextExtractor(
tagMapping,
new ContextInterpreter.Factory() {
@Override
protected ContextInterpreter construct(Map<String, String> mapping) {
return new DatadogContextInterpreter(mapping);
return new DatadogContextInterpreter(mapping, isDatadogTagPropagationEnabled);
}
});
}
Expand All @@ -101,8 +102,12 @@ private static class DatadogContextInterpreter extends ContextInterpreter {
private static final int DD_TAGS = 7;
private static final int IGNORE = -1;

private DatadogContextInterpreter(Map<String, String> taggedHeaders) {
private final boolean isDatadogTagPropagationEnabled;

private DatadogContextInterpreter(
Map<String, String> taggedHeaders, boolean isDatadogTagPropagationEnabled) {
super(taggedHeaders);
this.isDatadogTagPropagationEnabled = isDatadogTagPropagationEnabled;
}

@Override
Expand Down Expand Up @@ -178,7 +183,8 @@ public boolean accept(String key, String value) {
endToEndStartTime = extractEndToEndStartTime(firstValue);
break;
case DD_TAGS:
ddTags = DatadogTags.create(value);
ddTags =
isDatadogTagPropagationEnabled ? DatadogTags.create(value) : DatadogTags.noop();
break;
case TAGS:
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,76 +1,20 @@
package datadog.trace.core.propagation;

import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.util.Base64Encoder;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/** Encapsulates x-datadog-tags logic */
public class DatadogTags {

private static final Base64Encoder BASE_64_ENCODER = new Base64Encoder(false);

private static final String UPSTREAM_SERVICES = "_dd.p.upstream_services";

private static final DecimalFormat RATE_FORMATTER = new DecimalFormat("#.####");
public abstract class DatadogTags {

public static DatadogTags empty() {
return new DatadogTags(null);
return new DatadogTagsTracking(null);
}

public static DatadogTags create(String value) {
return new DatadogTags(value);
}

private final String rawTags;

// assume that there is only one service and only latest sampling decision that matters
private volatile ServiceSamplingDecision samplingDecision;

private static final AtomicReferenceFieldUpdater<DatadogTags, ServiceSamplingDecision>
SAMPLING_DECISION_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
DatadogTags.class, ServiceSamplingDecision.class, "samplingDecision");

private static final class ServiceSamplingDecision {
private final String service;
private final int priority;
private final int mechanism;
private final double rate;

private ServiceSamplingDecision(String service, int priority, int mechanism, double rate) {
this.service = service;
this.priority = priority;
this.mechanism = mechanism;
this.rate = rate;
}

public void encode(StringBuilder sb) {
String serviceNameBase64 =
new String(
BASE_64_ENCODER.encode(service.getBytes(StandardCharsets.UTF_8)),
StandardCharsets.UTF_8);
sb.append(serviceNameBase64);
sb.append('|');
sb.append(priority);
sb.append('|');
sb.append(mechanism);
if (rate >= 0.0) {
sb.append('|');
sb.append(RATE_FORMATTER.format(rate));
}
}

public boolean isUnset() {
return priority == PrioritySampling.UNSET;
}
public static DatadogTags create(String rawTags) {
return new DatadogTagsTracking(rawTags);
}

public DatadogTags(String rawTags) {
this.rawTags = rawTags == null ? "" : rawTags;
public static DatadogTags noop() {
return DatadogTagsNoop.INSTANCE;
}

/**
Expand All @@ -81,137 +25,19 @@ public DatadogTags(String rawTags) {
* @param mechanism - sampling mechanism
* @param rate - sampling rate, pass a negative value if not applicable
*/
public void updateUpstreamServices(String serviceName, int priority, int mechanism, double rate) {
if (priority != PrioritySampling.UNSET
&& (samplingDecision == null
|| samplingDecision.priority != priority
|| samplingDecision.mechanism != mechanism
|| samplingDecision.rate != rate
|| !samplingDecision.service.equals(serviceName))) {
ServiceSamplingDecision newSamplingDecision =
new ServiceSamplingDecision(serviceName, priority, mechanism, rate);
SAMPLING_DECISION_UPDATER.compareAndSet(this, samplingDecision, newSamplingDecision);
}
}

public boolean isEmpty() {
return rawTags.isEmpty() && samplingDecision == null;
}
public abstract void updateUpstreamServices(
String serviceName, int priority, int mechanism, double rate);

/** @return encoded header value */
public String encode() {
boolean isSamplingDecisionEmpty = samplingDecision == null || samplingDecision.isUnset();
if (rawTags.isEmpty()) {
if (isSamplingDecisionEmpty) {
return "";
} else {
StringBuilder sb = new StringBuilder();
encodeUpstreamServices(sb);
return sb.toString();
}
} else if (isSamplingDecisionEmpty) {
// nothing has changed, return rawTags as is
return rawTags;
}
/** @return true if original rawTags and/or upstream services non-empty */
public abstract boolean isEmpty();

StringBuilder sb = new StringBuilder();

// find upstream services and the following coma or the end of the string
int upstreamStart = rawTags.indexOf(UPSTREAM_SERVICES);
if (upstreamStart < 0) {
// there is no upstream_services tag, append new to the end
sb.append(rawTags);
sb.append(',');
encodeUpstreamServices(sb);
return sb.toString();
}

// there is only upstream_services tag
int upstreamEnd = rawTags.indexOf(',', upstreamStart);
if (upstreamEnd < 0) {
// upstream_services tag is the last, prepend whole rawTags
sb.append(rawTags);
appendUpstreamServicesEncoded(sb, rawTags.length() - 1);
return sb.toString();
}

// there is a tag following upstream_services, prepend it as is to avoid parsing
sb.append(rawTags.subSequence(0, upstreamEnd));
appendUpstreamServicesEncoded(sb, upstreamEnd - 1);
// append following tags
sb.append(rawTags.subSequence(upstreamEnd, rawTags.length()));

return sb.toString();
}
/** @return encoded header value or an empty string */
public abstract String encodeAsHeaderValue();

/**
* Parses rawTags to a map and adds upstream_service to the result
* Parses rawTags to a map and merges the upstream_service tag if any to the result
*
* @return tags as a map or null when rawTags is malformed
* @return a new modifiable map containing tags or null
*/
public Map<String, String> parseTags() {
Map<String, String> result = new HashMap<>();
if (!rawTags.isEmpty()) {
int startIndex = 0;
// split rawTags and put them into the map
while (startIndex < rawTags.length()) {
String tagName;
int tagNamePosition = rawTags.indexOf('=', startIndex);
if (tagNamePosition > 0) {
tagName = rawTags.substring(startIndex, tagNamePosition);
} else {
// tag name without following `=`
return null;
}
startIndex = tagNamePosition + 1;

int tagValueEnds = rawTags.indexOf(',', startIndex);
if (tagValueEnds < 0) {
tagValueEnds = rawTags.length();
}
String tagValue = rawTags.substring(startIndex, tagValueEnds);
if (!tagValue.isEmpty()) {
result.put(tagName, tagValue);
}
startIndex = tagValueEnds + 1;
}
}
// add upstream_services
String upstreamServices = result.get(UPSTREAM_SERVICES);
StringBuilder sb = new StringBuilder();
boolean nonEmptySamplingDecision = samplingDecision != null;
if (upstreamServices != null) {
sb.append(upstreamServices);
if (nonEmptySamplingDecision) {
sb.append(';');
}
}
if (nonEmptySamplingDecision) {
samplingDecision.encode(sb);
}
if (sb.length() > 0) {
result.put(UPSTREAM_SERVICES, sb.toString());
}
return result;
}

@Override
public String toString() {
return rawTags;
}

private void appendUpstreamServicesEncoded(StringBuilder sb, int lastCharIndex) {
char lastChar = rawTags.charAt(lastCharIndex);
// check if a separator needed
if (lastChar != '=' && lastChar != ';') {
sb.append(';');
}
samplingDecision.encode(sb);
}

private void encodeUpstreamServices(StringBuilder sb) {
sb.append(UPSTREAM_SERVICES);
sb.append('=');
samplingDecision.encode(sb);
}
public abstract Map<String, String> parseAndMerge();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package datadog.trace.core.propagation;

import java.util.Map;

/** Noop implementation of DatadogTags. It's used when x-datadog-tags propagation is disabled. */
class DatadogTagsNoop extends DatadogTags {

static DatadogTags INSTANCE = new DatadogTagsNoop();

private DatadogTagsNoop() {}

@Override
public void updateUpstreamServices(String serviceName, int priority, int mechanism, double rate) {
// noop impl, do nothing
}

@Override
public boolean isEmpty() {
// always empty
return true;
}

@Override
public String encodeAsHeaderValue() {
return "";
}

@Override
public Map<String, String> parseAndMerge() {
return null;
}

@Override
public String toString() {
return null;
}
}

0 comments on commit d2ba595

Please sign in to comment.