Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracer: report config-change telemetry in dynamic config #2350

Merged
merged 6 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions ddtrace/tracer/dynamic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,91 @@ package tracer

import (
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)

// dynamicConfig is a thread-safe generic data structure to represent configuration fields.
// It's designed to satisfy the dynamic configuration semantics (i.e reset, update, apply configuration changes).
// This structure will be extended to track the origin of configuration values as well (e.g remote_config, env_var).
type dynamicConfig[T any] struct {
sync.RWMutex
current T // holds the current configuration value
startup T // holds the startup configuration value
apply func(T) // applies a configuration value
isReset bool // internal boolean to avoid unnecessary resets
current T // holds the current configuration value
startup T // holds the startup configuration value
cfgName string // holds the name of the configuration, has to be compatible with telemetry.Configuration.Name
cfgOrigin string // holds the origin of the current configuration value (currently only supports remote_config, empty otherwise)
apply func(T) bool // applies a configuration value
equal func(x, y T) bool // compares two configuration values
}

func newDynamicConfig[T any](val T, apply func(T)) dynamicConfig[T] {
func newDynamicConfig[T any](name string, val T, apply func(T) bool, equal func(x, y T) bool) dynamicConfig[T] {
return dynamicConfig[T]{
cfgName: name,
current: val,
startup: val,
apply: apply,
isReset: true,
equal: equal,
}
}

// update applies a new configuration value
func (dc *dynamicConfig[T]) update(val T) {
func (dc *dynamicConfig[T]) update(val T, origin string) bool {
dc.Lock()
defer dc.Unlock()
if dc.equal(dc.current, val) {
return false
}
dc.current = val
dc.apply(val)
dc.isReset = false
dc.cfgOrigin = origin
return dc.apply(val)
}

// reset re-applies the startup configuration value
func (dc *dynamicConfig[T]) reset() {
func (dc *dynamicConfig[T]) reset() bool {
dc.Lock()
defer dc.Unlock()
if dc.isReset {
return
if dc.equal(dc.current, dc.startup) {
return false
}
dc.current = dc.startup
dc.apply(dc.startup)
dc.isReset = true
dc.cfgOrigin = ""
return dc.apply(dc.startup)
}

// handleRC processes a new configuration value from remote config
func (dc *dynamicConfig[T]) handleRC(val *T) {
// Returns whether the configuration value has been updated or not
func (dc *dynamicConfig[T]) handleRC(val *T) bool {
if val != nil {
dc.update(*val)
return
return dc.update(*val, "remote_config")
}
return dc.reset()
}

// toTelemetry returns the current configuration value as telemetry.Configuration
func (dc *dynamicConfig[T]) toTelemetry() telemetry.Configuration {
dc.RLock()
defer dc.RUnlock()
return telemetry.Sanitize(telemetry.Configuration{
Name: dc.cfgName,
Value: dc.current,
Origin: dc.cfgOrigin,
})
}

func equal[T comparable](x, y T) bool {
return x == y
}

// equalSlice compares two slices of comparable values
// The comparison takes into account the order of the elements
func equalSlice[T comparable](x, y []T) bool {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
if len(x) != len(y) {
return false
}
for i, v := range x {
if v != y[i] {
return false
}
}
dc.reset()
return true
}
9 changes: 6 additions & 3 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func newConfig(opts ...StartOption) *config {
if v := os.Getenv("DD_SERVICE_MAPPING"); v != "" {
internal.ForEachStringTag(v, func(key, val string) { WithServiceMapping(key, val)(c) })
}
c.headerAsTags = newDynamicConfig(nil, setHeaderTags)
c.headerAsTags = newDynamicConfig("trace_header_tags", nil, setHeaderTags, equalSlice[string])
if v := os.Getenv("DD_TRACE_HEADER_TAGS"); v != "" {
WithHeaderTags(strings.Split(v, ","))(c)
}
Expand Down Expand Up @@ -1219,12 +1219,14 @@ func StackFrames(n, skip uint) FinishOption {
// Special headers can not be sub-selected. E.g., an entire Cookie header would be transmitted, without the ability to choose specific Cookies.
func WithHeaderTags(headerAsTags []string) StartOption {
return func(c *config) {
c.headerAsTags = newDynamicConfig(headerAsTags, setHeaderTags)
c.headerAsTags = newDynamicConfig("trace_header_tags", headerAsTags, setHeaderTags, equalSlice[string])
setHeaderTags(headerAsTags)
}
}

func setHeaderTags(headerAsTags []string) {
// setHeaderTags sets the global header tags.
// Always resets the global value and returns true.
func setHeaderTags(headerAsTags []string) bool {
globalconfig.ClearHeaderTags()
for _, h := range headerAsTags {
if strings.HasPrefix(h, "x-datadog-") {
Expand All @@ -1233,6 +1235,7 @@ func setHeaderTags(headerAsTags []string) {
header, tag := normalizer.HeaderTag(h)
globalconfig.SetHeaderTag(header, tag)
}
return true
}

// UserMonitoringConfig is used to configure what is used to identify a user.
Expand Down
16 changes: 14 additions & 2 deletions ddtrace/tracer/remote_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"

"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
)
Expand Down Expand Up @@ -64,6 +65,7 @@ func (t *tracer) onRemoteConfigUpdate(updates map[string]remoteconfig.ProductUpd
if !found {
return statuses
}
var telemConfigs []telemetry.Configuration
for path, raw := range u {
if raw == nil {
continue
Expand All @@ -76,8 +78,18 @@ func (t *tracer) onRemoteConfigUpdate(updates map[string]remoteconfig.ProductUpd
continue
}
statuses[path] = state.ApplyStatus{State: state.ApplyStateAcknowledged}
t.config.traceSampleRate.handleRC(c.LibConfig.SamplingRate)
t.config.headerAsTags.handleRC(c.LibConfig.HeaderTags.toSlice())
updated := t.config.traceSampleRate.handleRC(c.LibConfig.SamplingRate)
if updated {
telemConfigs = append(telemConfigs, t.config.traceSampleRate.toTelemetry())
}
updated = t.config.headerAsTags.handleRC(c.LibConfig.HeaderTags.toSlice())
if updated {
telemConfigs = append(telemConfigs, t.config.headerAsTags.toTelemetry())
}
}
if len(telemConfigs) > 0 {
log.Debug("Reporting %d configuration changes to telemetry", len(telemConfigs))
telemetry.GlobalClient.ConfigChange(telemConfigs)
}
return statuses
}
Expand Down
63 changes: 63 additions & 0 deletions ddtrace/tracer/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import (

"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry/telemetrytest"

"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/stretchr/testify/require"
)

func TestOnRemoteConfigUpdate(t *testing.T) {
t.Run("RC sampling rate = 0.5 is applied and can be reverted", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

tracer, _, _, stop := startTestTracer(t)
defer stop()

Expand All @@ -30,16 +35,28 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
s.Finish()
require.Equal(t, 0.5, s.Metrics[keyRulesSamplerAppliedRate])

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_sample_rate", Value: 0.5, Origin: "remote_config"}})

// Unset RC. Assert _dd.rule_psr is not set
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
s = tracer.StartSpan("web.request").(*span)
s.Finish()
require.NotContains(t, keyRulesSamplerAppliedRate, s.Metrics)

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
// Not calling AssertCalled because the configuration contains a math.NaN()
// as value which cannot be asserted see https://github.com/stretchr/testify/issues/624
})

t.Run("DD_TRACE_SAMPLE_RATE=0.1 and RC sampling rate = 0.2", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

t.Setenv("DD_TRACE_SAMPLE_RATE", "0.1")
tracer, _, _, stop := startTestTracer(t)
defer stop()
Expand All @@ -54,16 +71,27 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
s.Finish()
require.Equal(t, 0.2, s.Metrics[keyRulesSamplerAppliedRate])

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_sample_rate", Value: 0.2, Origin: "remote_config"}})

// Unset RC. Assert _dd.rule_psr shows the previous sampling rate (0.1) is applied
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
s = tracer.StartSpan("web.request").(*span)
s.Finish()
require.Equal(t, 0.1, s.Metrics[keyRulesSamplerAppliedRate])

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_sample_rate", Value: 0.1, Origin: ""}})
})

t.Run("RC header tags = X-Test-Header:my-tag-name is applied and can be reverted", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

tracer, _, _, stop := startTestTracer(t)
defer stop()

Expand All @@ -76,14 +104,24 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
require.Equal(t, 1, globalconfig.HeaderTagsLen())
require.Equal(t, "my-tag-name", globalconfig.HeaderTag("X-Test-Header"))

telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "X-Test-Header:my-tag-name", Origin: "remote_config"}})

// Unset RC. Assert header tags are not set
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
require.Equal(t, 0, globalconfig.HeaderTagsLen())

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "", Origin: ""}})
})

t.Run("DD_TRACE_HEADER_TAGS=X-Test-Header:my-tag-name-from-env and RC header tags = X-Test-Header:my-tag-name-from-rc", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

t.Setenv("DD_TRACE_HEADER_TAGS", "X-Test-Header:my-tag-name-from-env")
tracer, _, _, stop := startTestTracer(t)
defer stop()
Expand All @@ -97,15 +135,26 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
require.Equal(t, 1, globalconfig.HeaderTagsLen())
require.Equal(t, "my-tag-name-from-rc", globalconfig.HeaderTag("X-Test-Header"))

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "X-Test-Header:my-tag-name-from-rc", Origin: "remote_config"}})

// Unset RC. Assert global config shows the DD_TRACE_HEADER_TAGS header tag
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
require.Equal(t, 1, globalconfig.HeaderTagsLen())
require.Equal(t, "my-tag-name-from-env", globalconfig.HeaderTag("X-Test-Header"))

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "X-Test-Header:my-tag-name-from-env", Origin: ""}})
})

t.Run("In code header tags = X-Test-Header:my-tag-name-in-code and RC header tags = X-Test-Header:my-tag-name-from-rc", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

tracer, _, _, stop := startTestTracer(t, WithHeaderTags([]string{"X-Test-Header:my-tag-name-in-code"}))
defer stop()

Expand All @@ -118,15 +167,26 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
require.Equal(t, 1, globalconfig.HeaderTagsLen())
require.Equal(t, "my-tag-name-from-rc", globalconfig.HeaderTag("X-Test-Header"))

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 1)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "X-Test-Header:my-tag-name-from-rc", Origin: "remote_config"}})

// Unset RC. Assert global config shows the in-code header tag
input["APM_TRACING"] = remoteconfig.ProductUpdate{"path": []byte(`{"lib_config": {}}`)}
applyStatus = tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateAcknowledged, applyStatus["path"].State)
require.Equal(t, 1, globalconfig.HeaderTagsLen())
require.Equal(t, "my-tag-name-in-code", globalconfig.HeaderTag("X-Test-Header"))

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 2)
telemetryClient.AssertCalled(t, "ConfigChange", []telemetry.Configuration{{Name: "trace_header_tags", Value: "X-Test-Header:my-tag-name-in-code", Origin: ""}})
})

t.Run("Invalid payload", func(t *testing.T) {
telemetryClient := new(telemetrytest.MockClient)
defer telemetry.MockGlobalClient(telemetryClient)()

tracer, _, _, stop := startTestTracer(t)
defer stop()

Expand All @@ -136,5 +196,8 @@ func TestOnRemoteConfigUpdate(t *testing.T) {
applyStatus := tracer.onRemoteConfigUpdate(input)
require.Equal(t, state.ApplyStateError, applyStatus["path"].State)
require.NotEmpty(t, applyStatus["path"].Error)

// Telemetry
telemetryClient.AssertNumberOfCalls(t, "ConfigChange", 0)
})
}
15 changes: 13 additions & 2 deletions ddtrace/tracer/rules_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,25 @@ func (rs *traceRulesSampler) enabled() bool {
return len(rs.rules) > 0 || !math.IsNaN(rs.globalRate)
}

func (rs *traceRulesSampler) setGlobalSampleRate(rate float64) {
// setGlobalSampleRate sets the global sample rate to the given value.
// Returns whether the value was changed or not.
func (rs *traceRulesSampler) setGlobalSampleRate(rate float64) bool {
if rate < 0.0 || rate > 1.0 {
log.Warn("Ignoring trace sample rate %f: value out of range [0,1]", rate)
return
return false
}
rs.m.Lock()
defer rs.m.Unlock()
if math.IsNaN(rs.globalRate) && math.IsNaN(rate) {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
// NaN is not considered equal to any number, including itself.
// It should be compared with math.IsNaN
return false
}
if rs.globalRate == rate {
return false
}
rs.globalRate = rate
return true
}

// apply uses the sampling rules to determine the sampling rate for the
Expand Down
Loading
Loading