Skip to content

Commit

Permalink
ddtrace/tracer: added UnmarshalJSON method to sampling rules (#2563)
Browse files Browse the repository at this point in the history
  • Loading branch information
dianashevchenko committed Feb 26, 2024
1 parent 5d3b29b commit 5b310de
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 80 deletions.
10 changes: 5 additions & 5 deletions ddtrace/tracer/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStartupLog(t *testing.T) {
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}},"feature_flags":\["discovery"\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(1\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}},"feature_flags":\["discovery"\]}`, tp.Logs()[1])
})

t.Run("limit", func(t *testing.T) {
Expand Down Expand Up @@ -93,21 +93,21 @@ func TestStartupLog(t *testing.T) {
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(1\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
})

t.Run("errors", func(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)
t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service","sample_rate": 0.234}, {"service": "other.service"}]`)
t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service","sample_rate": 0.234}, {"service": "other.service","sample_rate": 2}]`)
tracer, _, _, stop := startTestTracer(t, WithLogger(tp))
defer stop()

tp.Reset()
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"\^some\\\\\.service\$","sample_rate":0\.234,"type":"trace\(0\)"}\],"sampling_rules_error":"\\n\\tat index 1: rate not provided","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"\^some\\\\\.service\$","sample_rate":0\.234,"type":"trace\(1\)"}\],"sampling_rules_error":"\\n\\tat index 1: ignoring rule {Service:other.service Rate:2}: rate is out of \[0\.0, 1\.0] range","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
})

t.Run("lambda", func(t *testing.T) {
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestLogSamplingRules(t *testing.T) {
defer stop()

assert.Len(tp.Logs(), 1)
assert.Regexp(logPrefixRegexp+` WARN: DIAGNOSTICS Error\(s\) parsing sampling rules: found errors:\n\tat index 1: rate not provided\n\tat index 3: rate not provided\n\tat index 4: ignoring rule {Service: Name: Rate:9\.10 MaxPerSecond:0 Resource: Tags:map\[\]}: rate is out of \[0\.0, 1\.0] range$`, tp.Logs()[0])
assert.Regexp(logPrefixRegexp+` WARN: DIAGNOSTICS Error\(s\) parsing sampling rules: found errors:\n\tat index 4: ignoring rule {Rate:9\.10}: rate is out of \[0\.0, 1\.0] range$`, tp.Logs()[0])
}

func TestLogAgentReachable(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package tracer

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
Expand Down
145 changes: 93 additions & 52 deletions ddtrace/tracer/rules_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (sr *SamplingRule) match(s *span) bool {
type SamplingRuleType int

const (
SamplingRuleUndefined SamplingRuleType = 0

// SamplingRuleTrace specifies a sampling rule that applies to the entire trace if any spans satisfy the criteria.
// If a sampling rule is of type SamplingRuleTrace, such rule determines the sampling rate to apply
// to trace spans. If a span matches that rule, it will impact the trace sampling decision.
Expand Down Expand Up @@ -138,34 +140,38 @@ func (sr SamplingRuleType) String() string {
// to spans that match the service name provided.
func ServiceRule(service string, rate float64) SamplingRule {
return SamplingRule{
Service: globMatch(service),
Rate: rate,
Service: globMatch(service),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// NameRule returns a SamplingRule that applies the provided sampling rate
// to spans that match the operation name provided.
func NameRule(name string, rate float64) SamplingRule {
return SamplingRule{
Name: globMatch(name),
Rate: rate,
Name: globMatch(name),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// NameServiceRule returns a SamplingRule that applies the provided sampling rate
// to spans matching both the operation and service names provided.
func NameServiceRule(name string, service string, rate float64) SamplingRule {
return SamplingRule{
Service: globMatch(service),
Name: globMatch(name),
Rate: rate,
Service: globMatch(service),
Name: globMatch(name),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// RateRule returns a SamplingRule that applies the provided sampling rate to all spans.
func RateRule(rate float64) SamplingRule {
return SamplingRule{
Rate: rate,
Rate: rate,
ruleType: SamplingRuleTrace,
}
}

Expand Down Expand Up @@ -589,74 +595,106 @@ func samplingRulesFromEnv() (trace, span []SamplingRule, err error) {
return trace, span, err
}

func (sr *SamplingRule) UnmarshalJSON(b []byte) error {
if len(b) == 0 {
return nil
}
var v jsonRule
if err := json.Unmarshal(b, &v); err != nil {
return err
}
rules, err := validateRules([]jsonRule{v}, SamplingRuleUndefined)
if err != nil {
return err
}
*sr = rules[0]
return nil
}

type jsonRule struct {
Service string `json:"service"`
Name string `json:"name"`
Rate json.Number `json:"sample_rate"`
MaxPerSecond float64 `json:"max_per_second"`
Resource string `json:"resource"`
Tags map[string]string `json:"tags"`
Type *SamplingRuleType `json:"type,omitempty"`
}

func (j jsonRule) String() string {
var s []string
if j.Service != "" {
s = append(s, fmt.Sprintf("Service:%s", j.Service))
}
if j.Name != "" {
s = append(s, fmt.Sprintf("Name:%s", j.Name))
}
if j.Rate != "" {
s = append(s, fmt.Sprintf("Rate:%s", j.Rate))
}
if j.MaxPerSecond != 0 {
s = append(s, fmt.Sprintf("MaxPerSecond:%f", j.MaxPerSecond))
}
if j.Resource != "" {
s = append(s, fmt.Sprintf("Resource:%s", j.Resource))
}
if len(j.Tags) != 0 {
s = append(s, fmt.Sprintf("Tags:%v", j.Tags))
}
if j.Type != nil {
s = append(s, fmt.Sprintf("Type: %v", *j.Type))
}
return fmt.Sprintf("{%s}", strings.Join(s, " "))
}

// unmarshalSamplingRules unmarshals JSON from b and returns the sampling rules found, attributing
// the type t to them. If any errors are occurred, they are returned.
func unmarshalSamplingRules(b []byte, spanType SamplingRuleType) ([]SamplingRule, error) {
if len(b) == 0 {
return nil, nil
}
var jsonRules []struct {
Service string `json:"service"`
Name string `json:"name"`
Rate json.Number `json:"sample_rate"`
MaxPerSecond float64 `json:"max_per_second"`
Resource string `json:"resource"`
Tags map[string]string `json:"tags"`
}
var jsonRules []jsonRule
// if the JSON is an array, unmarshal it as an array of rules
err := json.Unmarshal(b, &jsonRules)
if err != nil {
return nil, fmt.Errorf("error unmarshalling JSON: %v", err)
}
rules := make([]SamplingRule, 0, len(jsonRules))
return validateRules(jsonRules, spanType)
}

func validateRules(jsonRules []jsonRule, spanType SamplingRuleType) ([]SamplingRule, error) {
var errs []string
rules := make([]SamplingRule, 0, len(jsonRules))
for i, v := range jsonRules {
if v.Rate == "" {
if spanType == SamplingRuleSpan {
v.Rate = "1"
} else {
errs = append(errs, fmt.Sprintf("at index %d: rate not provided", i))
continue
}
v.Rate = "1"
}
if v.Type != nil && *v.Type != spanType {
spanType = *v.Type
}
rate, err := v.Rate.Float64()
if err != nil {
errs = append(errs, fmt.Sprintf("at index %d: %v", i, err))
continue
}
if rate < 0.0 || rate > 1.0 {
errs = append(errs, fmt.Sprintf("at index %d: ignoring rule %+v: rate is out of [0.0, 1.0] range", i, v))
errs = append(errs, fmt.Sprintf("at index %d: ignoring rule %s: rate is out of [0.0, 1.0] range", i, v.String()))
continue
}
tagGlobs := make(map[string]*regexp.Regexp, len(v.Tags))
for k, g := range v.Tags {
tagGlobs[k] = globMatch(g)
}
switch spanType {
case SamplingRuleSpan:
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
MaxPerSecond: v.MaxPerSecond,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
limiter: newSingleSpanRateLimiter(v.MaxPerSecond),
ruleType: SamplingRuleSpan,
})
case SamplingRuleTrace:
if v.Rate == "" {
errs = append(errs, fmt.Sprintf("at index %d: rate not provided", i))
continue
}
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
ruleType: SamplingRuleTrace,
})
}
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
MaxPerSecond: v.MaxPerSecond,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
ruleType: spanType,
limiter: newSingleSpanRateLimiter(v.MaxPerSecond),
})
}
if len(errs) != 0 {
return rules, fmt.Errorf("%s", strings.Join(errs, "\n\t"))
Expand All @@ -672,7 +710,7 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
Resource string `json:"resource,omitempty"`
Rate float64 `json:"sample_rate"`
Tags map[string]string `json:"tags,omitempty"`
Type string `json:"type"`
Type *string `json:"type,omitempty"`
MaxPerSecond *float64 `json:"max_per_second,omitempty"`
}{}
if sr.Service != nil {
Expand All @@ -688,7 +726,10 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
s.Resource = sr.Resource.String()
}
s.Rate = sr.Rate
s.Type = fmt.Sprintf("%v(%d)", sr.ruleType.String(), sr.ruleType)
if v := sr.ruleType.String(); v != "" {
t := fmt.Sprintf("%v(%d)", v, sr.ruleType)
s.Type = &t
}
s.Tags = make(map[string]string, len(sr.Tags))
for k, v := range sr.Tags {
if v != nil {
Expand Down
Loading

0 comments on commit 5b310de

Please sign in to comment.