Skip to content

Commit

Permalink
tracer: configure global tags via remote-config (#2378)
Browse files Browse the repository at this point in the history
Make global tags dynamic: support custom global tags via remote config
Report RC capabilities for custom tags
Report telemetry events on custom tag updates
  • Loading branch information
ahmed-mez committed Nov 23, 2023
1 parent 55aa930 commit 8beb30d
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 22 deletions.
24 changes: 22 additions & 2 deletions ddtrace/tracer/dynamic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type dynamicConfig[T any] struct {
startup T // holds the startup configuration value
cfgName string // holds the name of the configuration, has to be compatible with telemetry.Configuration.Name
cfgOrigin string // holds the origin of the current configuration value (currently only supports remote_config, empty otherwise)
apply func(T) bool // applies a configuration value
equal func(x, y T) bool // compares two configuration values
apply func(T) bool // executes any config-specific operations to propagate the update properly, returns whether the update was applied
equal func(x, y T) bool // compares two configuration values, this is used to avoid unnecessary config and telemetry updates
}

func newDynamicConfig[T any](name string, val T, apply func(T) bool, equal func(x, y T) bool) dynamicConfig[T] {
Expand All @@ -34,6 +34,13 @@ func newDynamicConfig[T any](name string, val T, apply func(T) bool, equal func(
}
}

// get returns the current configuration value
func (dc *dynamicConfig[T]) get() T {
dc.RLock()
defer dc.RUnlock()
return dc.current
}

// update applies a new configuration value
func (dc *dynamicConfig[T]) update(val T, origin string) bool {
dc.Lock()
Expand Down Expand Up @@ -95,3 +102,16 @@ func equalSlice[T comparable](x, y []T) bool {
}
return true
}

// equalMap compares two maps of comparable keys and values
func equalMap[T comparable](x, y map[T]interface{}) bool {
if len(x) != len(y) {
return false
}
for k, v := range x {
if yv, ok := y[k]; !ok || yv != v {
return false
}
}
return true
}
2 changes: 1 addition & 1 deletion ddtrace/tracer/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func checkEndpoint(c *http.Client, endpoint string) error {
// JSON format.
func logStartup(t *tracer) {
tags := make(map[string]string)
for k, v := range t.config.globalTags {
for k, v := range t.config.globalTags.get() {
tags[k] = fmt.Sprintf("%v", v)
}

Expand Down
32 changes: 24 additions & 8 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type config struct {

// globalTags holds a set of tags that will be automatically applied to
// all spans.
globalTags map[string]interface{}
globalTags dynamicConfig[map[string]interface{}]

// transport specifies the Transport interface which will be used to send data to the agent.
transport transport
Expand Down Expand Up @@ -409,22 +409,23 @@ func newConfig(opts ...StartOption) *config {
c.httpClient = defaultClient
}
WithGlobalTag(ext.RuntimeID, globalconfig.RuntimeID())(c)
globalTags := c.globalTags.get()
if c.env == "" {
if v, ok := c.globalTags["env"]; ok {
if v, ok := globalTags["env"]; ok {
if e, ok := v.(string); ok {
c.env = e
}
}
}
if c.version == "" {
if v, ok := c.globalTags["version"]; ok {
if v, ok := globalTags["version"]; ok {
if ver, ok := v.(string); ok {
c.version = ver
}
}
}
if c.serviceName == "" {
if v, ok := c.globalTags["service"]; ok {
if v, ok := globalTags["service"]; ok {
if s, ok := v.(string); ok {
c.serviceName = s
globalconfig.SetServiceName(s)
Expand Down Expand Up @@ -488,6 +489,9 @@ func newConfig(opts ...StartOption) *config {
}
c.dogstatsdAddr = addr
}
// Re-initialize the globalTags config with the value constructed from the environment and start options
// This allows persisting the initial value of globalTags for future resets and updates.
c.initGlobalTags(c.globalTags.get())

return c
}
Expand Down Expand Up @@ -703,7 +707,7 @@ func statsTags(c *config) []string {
if c.hostname != "" {
tags = append(tags, "host:"+c.hostname)
}
for k, v := range c.globalTags {
for k, v := range c.globalTags.get() {
if vstr, ok := v.(string); ok {
tags = append(tags, k+":"+vstr)
}
Expand Down Expand Up @@ -875,13 +879,25 @@ func WithPeerServiceMapping(from, to string) StartOption {
// created by tracer. This option may be used multiple times.
func WithGlobalTag(k string, v interface{}) StartOption {
return func(c *config) {
if c.globalTags == nil {
c.globalTags = make(map[string]interface{})
if c.globalTags.get() == nil {
c.initGlobalTags(map[string]interface{}{})
}
c.globalTags[k] = v
c.globalTags.Lock()
defer c.globalTags.Unlock()
c.globalTags.current[k] = v
}
}

// initGlobalTags initializes the globalTags config with the provided init value
func (c *config) initGlobalTags(init map[string]interface{}) {
apply := func(map[string]interface{}) bool {
// always set the runtime ID on updates
c.globalTags.current[ext.RuntimeID] = globalconfig.RuntimeID()
return true
}
c.globalTags = newDynamicConfig[map[string]interface{}]("trace_tags", init, apply, equalMap[string])
}

// WithSampler sets the given sampler to be used with the tracer. By default
// an all-permissive sampler is used.
func WithSampler(s Sampler) StartOption {
Expand Down
18 changes: 10 additions & 8 deletions ddtrace/tracer/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ func TestTracerOptionsDefaults(t *testing.T) {
c := tracer.config
assert.Equal(float64(0.5), c.sampler.(RateSampler).Rate())
assert.Equal(&url.URL{Scheme: "http", Host: "ddagent.consul.local:58126"}, c.agentURL)
assert.NotNil(c.globalTags)
assert.Equal("v", c.globalTags["k"])
assert.NotNil(c.globalTags.get())
assert.Equal("v", c.globalTags.get()["k"])
assert.Equal("testEnv", c.env)
assert.True(c.debug)
})
Expand All @@ -572,12 +572,13 @@ func TestTracerOptionsDefaults(t *testing.T) {
assert := assert.New(t)
c := newConfig()

assert.Equal("test", c.globalTags["env"])
assert.Equal("aVal", c.globalTags["aKey"])
assert.Equal("bVal", c.globalTags["bKey"])
assert.Equal("", c.globalTags["cKey"])
globalTags := c.globalTags.get()
assert.Equal("test", globalTags["env"])
assert.Equal("aVal", globalTags["aKey"])
assert.Equal("bVal", globalTags["bKey"])
assert.Equal("", globalTags["cKey"])

dVal, ok := c.globalTags["dKey"]
dVal, ok := globalTags["dKey"]
assert.False(ok)
assert.Equal(nil, dVal)
})
Expand Down Expand Up @@ -995,8 +996,9 @@ func TestTagSeparators(t *testing.T) {
os.Setenv("DD_TAGS", tag.in)
defer os.Unsetenv("DD_TAGS")
c := newConfig()
globalTags := c.globalTags.get()
for key, expected := range tag.out {
got, ok := c.globalTags[key]
got, ok := globalTags[key]
assert.True(ok, "tag not found")
assert.Equal(expected, got)
}
Expand Down
24 changes: 24 additions & 0 deletions ddtrace/tracer/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type target struct {
type libConfig struct {
SamplingRate *float64 `json:"tracing_sampling_rate,omitempty"`
HeaderTags *headerTags `json:"tracing_header_tags,omitempty"`
Tags *tags `json:"tracing_tags,omitempty"`
}

type headerTags []headerTag
Expand Down Expand Up @@ -58,6 +59,21 @@ func (ht headerTag) toString() string {
return sb.String()
}

type tags []string

func (t *tags) toMap() *map[string]interface{} {
if t == nil {
return nil
}
m := make(map[string]interface{}, len(*t))
for _, tag := range *t {
if kv := strings.SplitN(tag, ":", 2); len(kv) == 2 {
m[kv[0]] = kv[1]
}
}
return &m
}

// onRemoteConfigUpdate is a remote config callaback responsible for processing APM_TRACING RC-product updates.
func (t *tracer) onRemoteConfigUpdate(updates map[string]remoteconfig.ProductUpdate) map[string]state.ApplyStatus {
statuses := map[string]state.ApplyStatus{}
Expand Down Expand Up @@ -96,6 +112,10 @@ func (t *tracer) onRemoteConfigUpdate(updates map[string]remoteconfig.ProductUpd
if updated {
telemConfigs = append(telemConfigs, t.config.headerAsTags.toTelemetry())
}
updated = t.config.globalTags.handleRC(c.LibConfig.Tags.toMap())
if updated {
telemConfigs = append(telemConfigs, t.config.globalTags.toTelemetry())
}
}
if len(telemConfigs) > 0 {
log.Debug("Reporting %d configuration changes to telemetry", len(telemConfigs))
Expand Down Expand Up @@ -123,5 +143,9 @@ func (t *tracer) startRemoteConfig(rcConfig remoteconfig.ClientConfig) error {
if err != nil {
return err
}
err = remoteconfig.RegisterCapability(remoteconfig.APMTracingCustomTags)
if err != nil {
return err
}
return remoteconfig.RegisterCallback(t.onRemoteConfigUpdate)
}
51 changes: 51 additions & 0 deletions ddtrace/tracer/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package tracer
import (
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
Expand Down Expand Up @@ -236,6 +237,52 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 0)
})

t.Run("DD_TAGS=key0:val0,key1:val1, WithGlobalTag=key2:val2 and RC tags = key3:val3,key4:val4", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

t.Setenv("DD_TAGS", "key0:val0,key1:val1")
tracer, _, _, stop := startTestTracer(t, WithService("my-service"), WithEnv("my-env"), WithGlobalTag("key2", "val2"))
defer stop()

// Apply RC. Assert global tags have the RC tags key3:val3,key4:val4 applied + runtime ID
input := map[string]remoteconfig.ProductUpdate{
"APM_TRACING": {"path": []byte(`{"lib_config": {"tracing_tags": ["key3:val3","key4:val4"]}, "service_target": {"service": "my-service", "env": "my-env"}}`)},
}
applyStatus := tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
s := tracer.StartSpan("web.request").(*span)
s.Finish()
require.NotContains(t, "key0", s.Meta)
require.NotContains(t, "key1", s.Meta)
require.NotContains(t, "key2", s.Meta)
require.Equal(t, "val3", s.Meta["key3"])
require.Equal(t, "val4", s.Meta["key4"])
require.Equal(t, globalconfig.RuntimeID(), s.Meta[ext.RuntimeID])
runtimeIDTag := ext.RuntimeID + ":" + globalconfig.RuntimeID()

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_tags", Value: "key3:val3,key4:val4," + runtimeIDTag, Origin: "remote_config"}})

// Unset RC. Assert config shows the original DD_TAGS + WithGlobalTag + runtime ID
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}, "service_target": {"service": "my-service", "env": "my-env"}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
s = tracer.StartSpan("web.request").(*span)
s.Finish()
require.Equal(t, "val0", s.Meta["key0"])
require.Equal(t, "val1", s.Meta["key1"])
require.Equal(t, "val2", s.Meta["key2"])
require.NotContains(t, "key3", s.Meta)
require.NotContains(t, "key4", s.Meta)
require.Equal(t, globalconfig.RuntimeID(), s.Meta[ext.RuntimeID])

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_tags", Value: "key0:val0,key1:val1,key2:val2," + runtimeIDTag, Origin: ""}})
})
}

func TestStartRemoteConfig(t *testing.T) {
Expand All @@ -254,4 +301,8 @@ func TestStartRemoteConfig(t *testing.T) {
found, err = remoteconfig.HasCapability(remoteconfig.APMTracingHTTPHeaderTags)
require.NoError(t, err)
require.True(t, found)

found, err = remoteconfig.HasCapability(remoteconfig.APMTracingCustomTags)
require.NoError(t, err)
require.True(t, found)
}
3 changes: 2 additions & 1 deletion ddtrace/tracer/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func startTelemetry(c *config) {
{Name: "orchestrion_enabled", Value: c.orchestrionCfg.Enabled},
c.traceSampleRate.toTelemetry(),
c.headerAsTags.toTelemetry(),
c.globalTags.toTelemetry(),
}
var peerServiceMapping []string
for key, value := range c.peerServiceMappings {
Expand All @@ -77,7 +78,7 @@ func startTelemetry(c *config) {
for k, v := range c.serviceMappings {
telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "service_mapping_" + k, Value: v})
}
for k, v := range c.globalTags {
for k, v := range c.globalTags.get() {
telemetryConfigs = append(telemetryConfigs, telemetry.Configuration{Name: "global_tag_" + k, Value: v})
}
rules := append(c.spanRules, c.traceRules...)
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
span.SetTag(k, v)
}
// add global tags
for k, v := range t.config.globalTags {
for k, v := range t.config.globalTags.get() {
span.SetTag(k, v)
}
if t.config.serviceMappings != nil {
Expand Down
23 changes: 22 additions & 1 deletion internal/telemetry/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package telemetry

import (
"fmt"
"math"
"sort"
"strings"
"testing"
)
Expand Down Expand Up @@ -52,7 +54,7 @@ func SetAgentlessEndpoint(endpoint string) string {
}

// Sanitize ensures the configuration values are valid and compatible.
// It removes NaN and Inf values and converts string slices into comma-separated strings.
// It removes NaN and Inf values and converts string slices and maps into comma-separated strings.
func Sanitize(c Configuration) Configuration {
switch val := c.Value.(type) {
case float64:
Expand All @@ -64,6 +66,25 @@ func Sanitize(c Configuration) Configuration {
case []string:
// The telemetry API only supports primitive types.
c.Value = strings.Join(val, ",")
case map[string]interface{}:
// The telemetry API only supports primitive types.
// Sort the keys to ensure the order is deterministic.
// This is technically not required but makes testing easier + it's not in a hot path.
keys := make([]string, 0, len(val))
for k := range val {
keys = append(keys, k)
}
sort.Strings(keys)
var sb strings.Builder
for _, k := range keys {
if sb.Len() > 0 {
sb.WriteString(",")
}
sb.WriteString(k)
sb.WriteString(":")
sb.WriteString(fmt.Sprint(val[k]))
}
c.Value = sb.String()
}
return c
}

0 comments on commit 8beb30d

Please sign in to comment.