Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: added UnmarshalJSON method to sampling rules #2563

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddtrace/tracer/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStartupLog(t *testing.T) {
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}},"feature_flags":\["discovery"\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"100","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(1\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":true,"metadata":{"version":"v1"}},"feature_flags":\["discovery"\]}`, tp.Logs()[1])
})

t.Run("limit", func(t *testing.T) {
Expand Down Expand Up @@ -93,21 +93,21 @@ func TestStartupLog(t *testing.T) {
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(0\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"configuredEnv","service":"configured.service","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":true,"analytics_enabled":true,"sample_rate":"0\.123000","sample_rate_limit":"1000.001","sampling_rules":\[{"service":"\^mysql\$","sample_rate":0\.75,"type":"trace\(1\)"}\],"sampling_rules_error":"","service_mappings":{"initial_service":"new_service"},"tags":{"runtime-id":"[^"]*","tag":"value","tag2":"NaN"},"runtime_metrics_enabled":true,"health_metrics_enabled":true,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"2.3.4","architecture":"[^"]*","global_service":"configured.service","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":false,"Stats":false,"DataStreams":false,"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
})

t.Run("errors", func(t *testing.T) {
assert := assert.New(t)
tp := new(log.RecordLogger)
t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service","sample_rate": 0.234}, {"service": "other.service"}]`)
t.Setenv("DD_TRACE_SAMPLING_RULES", `[{"service": "some.service","sample_rate": 0.234}, {"service": "other.service","sample_rate": 2}]`)
tracer, _, _, stop := startTestTracer(t, WithLogger(tp))
defer stop()

tp.Reset()
tp.Ignore("appsec: ", telemetry.LogPrefix)
logStartup(tracer)
require.Len(t, tp.Logs(), 2)
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"\^some\\\\\.service\$","sample_rate":0\.234,"type":"trace\(0\)"}\],"sampling_rules_error":"\\n\\tat index 1: rate not provided","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
assert.Regexp(logPrefixRegexp+` INFO: DATADOG TRACER CONFIGURATION {"date":"[^"]*","os_name":"[^"]*","os_version":"[^"]*","version":"[^"]*","lang":"Go","lang_version":"[^"]*","env":"","service":"tracer\.test(\.exe)?","agent_url":"http://localhost:9/v0.4/traces","agent_error":"Post .*","debug":false,"analytics_enabled":false,"sample_rate":"NaN","sample_rate_limit":"100","sampling_rules":\[{"service":"\^some\\\\\.service\$","sample_rate":0\.234,"type":"trace\(1\)"}\],"sampling_rules_error":"\\n\\tat index 1: ignoring rule {Service:other.service Rate:2}: rate is out of \[0\.0, 1\.0] range","service_mappings":null,"tags":{"runtime-id":"[^"]*"},"runtime_metrics_enabled":false,"health_metrics_enabled":false,"profiler_code_hotspots_enabled":((false)|(true)),"profiler_endpoints_enabled":((false)|(true)),"dd_version":"","architecture":"[^"]*","global_service":"","lambda_mode":"false","appsec":((true)|(false)),"agent_features":{"DropP0s":((true)|(false)),"Stats":((true)|(false)),"DataStreams":((true)|(false)),"StatsdPort":0},"integrations":{.*},"partial_flush_enabled":false,"partial_flush_min_spans":1000,"orchestrion":{"enabled":false},"feature_flags":\[\]}`, tp.Logs()[1])
})

t.Run("lambda", func(t *testing.T) {
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestLogSamplingRules(t *testing.T) {
defer stop()

assert.Len(tp.Logs(), 1)
assert.Regexp(logPrefixRegexp+` WARN: DIAGNOSTICS Error\(s\) parsing sampling rules: found errors:\n\tat index 1: rate not provided\n\tat index 3: rate not provided\n\tat index 4: ignoring rule {Service: Name: Rate:9\.10 MaxPerSecond:0 Resource: Tags:map\[\]}: rate is out of \[0\.0, 1\.0] range$`, tp.Logs()[0])
assert.Regexp(logPrefixRegexp+` WARN: DIAGNOSTICS Error\(s\) parsing sampling rules: found errors:\n\tat index 4: ignoring rule {Rate:9\.10}: rate is out of \[0\.0, 1\.0] range$`, tp.Logs()[0])
}

func TestLogAgentReachable(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/remote_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
package tracer

import (
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/remoteconfig"
Expand Down
145 changes: 93 additions & 52 deletions ddtrace/tracer/rules_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ func (sr *SamplingRule) match(s *span) bool {
type SamplingRuleType int

const (
SamplingRuleUndefined SamplingRuleType = 0

// SamplingRuleTrace specifies a sampling rule that applies to the entire trace if any spans satisfy the criteria.
// If a sampling rule is of type SamplingRuleTrace, such rule determines the sampling rate to apply
// to trace spans. If a span matches that rule, it will impact the trace sampling decision.
Expand Down Expand Up @@ -138,34 +140,38 @@ func (sr SamplingRuleType) String() string {
// to spans that match the service name provided.
func ServiceRule(service string, rate float64) SamplingRule {
return SamplingRule{
Service: globMatch(service),
Rate: rate,
Service: globMatch(service),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// NameRule returns a SamplingRule that applies the provided sampling rate
// to spans that match the operation name provided.
func NameRule(name string, rate float64) SamplingRule {
return SamplingRule{
Name: globMatch(name),
Rate: rate,
Name: globMatch(name),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// NameServiceRule returns a SamplingRule that applies the provided sampling rate
// to spans matching both the operation and service names provided.
func NameServiceRule(name string, service string, rate float64) SamplingRule {
return SamplingRule{
Service: globMatch(service),
Name: globMatch(name),
Rate: rate,
Service: globMatch(service),
Name: globMatch(name),
ruleType: SamplingRuleTrace,
Rate: rate,
}
}

// RateRule returns a SamplingRule that applies the provided sampling rate to all spans.
func RateRule(rate float64) SamplingRule {
return SamplingRule{
Rate: rate,
Rate: rate,
ruleType: SamplingRuleTrace,
}
}

Expand Down Expand Up @@ -589,74 +595,106 @@ func samplingRulesFromEnv() (trace, span []SamplingRule, err error) {
return trace, span, err
}

func (sr *SamplingRule) UnmarshalJSON(b []byte) error {
if len(b) == 0 {
return nil
}
var v jsonRule
if err := json.Unmarshal(b, &v); err != nil {
return err
}
rules, err := validateRules([]jsonRule{v}, SamplingRuleUndefined)
if err != nil {
return err
}
*sr = rules[0]
return nil
}

type jsonRule struct {
Service string `json:"service"`
Name string `json:"name"`
Rate json.Number `json:"sample_rate"`
MaxPerSecond float64 `json:"max_per_second"`
Resource string `json:"resource"`
Tags map[string]string `json:"tags"`
Type *SamplingRuleType `json:"type,omitempty"`
}

func (j jsonRule) String() string {
var s []string
if j.Service != "" {
s = append(s, fmt.Sprintf("Service:%s", j.Service))
}
if j.Name != "" {
s = append(s, fmt.Sprintf("Name:%s", j.Name))
}
if j.Rate != "" {
s = append(s, fmt.Sprintf("Rate:%s", j.Rate))
}
if j.MaxPerSecond != 0 {
s = append(s, fmt.Sprintf("MaxPerSecond:%f", j.MaxPerSecond))
}
if j.Resource != "" {
s = append(s, fmt.Sprintf("Resource:%s", j.Resource))
}
if len(j.Tags) != 0 {
s = append(s, fmt.Sprintf("Tags:%v", j.Tags))
}
if j.Type != nil {
s = append(s, fmt.Sprintf("Type: %v", *j.Type))
}
return fmt.Sprintf("{%s}", strings.Join(s, " "))
}

// unmarshalSamplingRules unmarshals JSON from b and returns the sampling rules found, attributing
// the type t to them. If any errors are occurred, they are returned.
func unmarshalSamplingRules(b []byte, spanType SamplingRuleType) ([]SamplingRule, error) {
if len(b) == 0 {
return nil, nil
}
var jsonRules []struct {
Service string `json:"service"`
Name string `json:"name"`
Rate json.Number `json:"sample_rate"`
MaxPerSecond float64 `json:"max_per_second"`
Resource string `json:"resource"`
Tags map[string]string `json:"tags"`
}
var jsonRules []jsonRule
// if the JSON is an array, unmarshal it as an array of rules
err := json.Unmarshal(b, &jsonRules)
if err != nil {
return nil, fmt.Errorf("error unmarshalling JSON: %v", err)
}
rules := make([]SamplingRule, 0, len(jsonRules))
return validateRules(jsonRules, spanType)
}

func validateRules(jsonRules []jsonRule, spanType SamplingRuleType) ([]SamplingRule, error) {
var errs []string
rules := make([]SamplingRule, 0, len(jsonRules))
for i, v := range jsonRules {
if v.Rate == "" {
if spanType == SamplingRuleSpan {
v.Rate = "1"
} else {
errs = append(errs, fmt.Sprintf("at index %d: rate not provided", i))
continue
}
v.Rate = "1"
}
if v.Type != nil && *v.Type != spanType {
spanType = *v.Type
}
rate, err := v.Rate.Float64()
if err != nil {
errs = append(errs, fmt.Sprintf("at index %d: %v", i, err))
continue
}
if rate < 0.0 || rate > 1.0 {
errs = append(errs, fmt.Sprintf("at index %d: ignoring rule %+v: rate is out of [0.0, 1.0] range", i, v))
errs = append(errs, fmt.Sprintf("at index %d: ignoring rule %s: rate is out of [0.0, 1.0] range", i, v.String()))
continue
}
tagGlobs := make(map[string]*regexp.Regexp, len(v.Tags))
for k, g := range v.Tags {
tagGlobs[k] = globMatch(g)
}
switch spanType {
case SamplingRuleSpan:
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
MaxPerSecond: v.MaxPerSecond,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
limiter: newSingleSpanRateLimiter(v.MaxPerSecond),
ruleType: SamplingRuleSpan,
})
case SamplingRuleTrace:
if v.Rate == "" {
errs = append(errs, fmt.Sprintf("at index %d: rate not provided", i))
continue
}
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
ruleType: SamplingRuleTrace,
})
}
rules = append(rules, SamplingRule{
Service: globMatch(v.Service),
Name: globMatch(v.Name),
Rate: rate,
MaxPerSecond: v.MaxPerSecond,
Resource: globMatch(v.Resource),
Tags: tagGlobs,
ruleType: spanType,
limiter: newSingleSpanRateLimiter(v.MaxPerSecond),
})
}
if len(errs) != 0 {
return rules, fmt.Errorf("%s", strings.Join(errs, "\n\t"))
Expand All @@ -672,7 +710,7 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
Resource string `json:"resource,omitempty"`
Rate float64 `json:"sample_rate"`
Tags map[string]string `json:"tags,omitempty"`
Type string `json:"type"`
Type *string `json:"type,omitempty"`
MaxPerSecond *float64 `json:"max_per_second,omitempty"`
}{}
if sr.Service != nil {
Expand All @@ -688,7 +726,10 @@ func (sr *SamplingRule) MarshalJSON() ([]byte, error) {
s.Resource = sr.Resource.String()
}
s.Rate = sr.Rate
s.Type = fmt.Sprintf("%v(%d)", sr.ruleType.String(), sr.ruleType)
if v := sr.ruleType.String(); v != "" {
t := fmt.Sprintf("%v(%d)", v, sr.ruleType)
s.Type = &t
}
s.Tags = make(map[string]string, len(sr.Tags))
for k, v := range sr.Tags {
if v != nil {
Expand Down
Loading
Loading