Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trace: use /v0.4 agent endpoint and priority sampling #27

Merged
merged 5 commits into from Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Expand Up @@ -15,4 +15,10 @@ provides some simple usage examples.

### Disclaimer

For trace, this package is considered experiemental and comes with limitations. More specifically, due to the differences in operation between Datadog and OpenCensus, statistics (such as percentiles) seen in the Datadog application will be inaccurate and will be limited to only sampled traces. It is not advised to rely on these numbers to assert accurate system behaviour. We are aware of the issue and the situation could change in the near future.
In order to get accurate Datadog APM statistics and full distributed tracing, trace sampling must be done by the Datadog stack. For this to be possible, OpenCensus must be notified to forward all traces to our exporter:

```go
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
```

This change simply means that Datadog will handle sampling. It does not mean that all traces will be sampled.
84 changes: 84 additions & 0 deletions sampler.go
@@ -0,0 +1,84 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadog.com/).
// Copyright 2018 Datadog, Inc.

package datadog

import (
"encoding/json"
"io"
"math"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)

// constants used for the Knuth hashing, same as agent.
const knuthFactor = uint64(1111111111111111111)

// sampledByRate verifies if the number n should be sampled at the specified
// rate.
func sampledByRate(n uint64, rate float64) bool {
if rate < 1 {
return n*knuthFactor < uint64(rate*math.MaxUint64)
}
return true
}

// prioritySampler holds a set of per-service sampling rates and applies
// them to spans.
type prioritySampler struct {
mu sync.RWMutex
rates map[string]float64
defaultRate float64
}

func newPrioritySampler() *prioritySampler {
return &prioritySampler{
rates: make(map[string]float64),
defaultRate: 1.,
}
}

// readRatesJSON will try to read the rates as JSON from the given io.ReadCloser.
func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
var payload struct {
Rates map[string]float64 `json:"rate_by_service"`
}
if err := json.NewDecoder(rc).Decode(&payload); err != nil {
return err
}
rc.Close()
const defaultRateKey = "service:,env:"
ps.mu.Lock()
defer ps.mu.Unlock()
ps.rates = payload.Rates
if v, ok := ps.rates[defaultRateKey]; ok {
ps.defaultRate = v
delete(ps.rates, defaultRateKey)
}
return nil
}

// getRate returns the sampling rate to be used for the given span.
func (ps *prioritySampler) getRate(spn *ddSpan) float64 {
key := "service:" + spn.Service + ",env:" + spn.Meta[ext.Environment]
ps.mu.RLock()
defer ps.mu.RUnlock()
if rate, ok := ps.rates[key]; ok {
return rate
}
return ps.defaultRate
}

// applyPriority applies sampling priority to the given ddSpan.
func (ps *prioritySampler) applyPriority(spn *ddSpan) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.Metrics[keySamplingPriority] = ext.PriorityAutoKeep
} else {
spn.Metrics[keySamplingPriority] = ext.PriorityAutoReject
}
spn.Metrics[keySamplingPriorityRate] = rate
}
167 changes: 167 additions & 0 deletions sampler_test.go
@@ -0,0 +1,167 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadog.com/).
// Copyright 2018 Datadog, Inc.

package datadog

import (
"io/ioutil"
"math"
"strings"
"sync"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"

"github.com/stretchr/testify/assert"
)

func TestPrioritySampler(t *testing.T) {
// create a new span with given service/env
mkSpan := func(svc, env string) *ddSpan {
s := &ddSpan{Service: svc, Meta: map[string]string{}}
if env != "" {
s.Meta["env"] = env
}
return s
}

t.Run("mkspan", func(t *testing.T) {
assert := assert.New(t)
s := mkSpan("my-service", "my-env")
assert.Equal("my-service", s.Service)
assert.Equal("my-env", s.Meta[ext.Environment])

s = mkSpan("my-service2", "")
assert.Equal("my-service2", s.Service)
_, ok := s.Meta[ext.Environment]
assert.False(ok)
})

t.Run("ops", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)

type key struct{ service, env string }
for _, tt := range []struct {
in string
out map[key]float64
}{
{
in: `{}`,
out: map[key]float64{
key{"some-service", ""}: 1,
key{"obfuscate.http", "none"}: 1,
},
},
{
in: `{
"rate_by_service":{
"service:,env:":0.8,
"service:obfuscate.http,env:":0.9,
"service:obfuscate.http,env:none":0.9
}
}`,
out: map[key]float64{
key{"obfuscate.http", ""}: 0.9,
key{"obfuscate.http", "none"}: 0.9,
key{"obfuscate.http", "other"}: 0.8,
key{"some-service", ""}: 0.8,
},
},
{
in: `{
"rate_by_service":{
"service:my-service,env:":0.2,
"service:my-service,env:none":0.2
}
}`,
out: map[key]float64{
key{"my-service", ""}: 0.2,
key{"my-service", "none"}: 0.2,
key{"obfuscate.http", ""}: 0.8,
key{"obfuscate.http", "none"}: 0.8,
key{"obfuscate.http", "other"}: 0.8,
key{"some-service", ""}: 0.8,
},
},
} {
assert.NoError(ps.readRatesJSON(ioutil.NopCloser(strings.NewReader(tt.in))))
for k, v := range tt.out {
assert.Equal(v, ps.getRate(mkSpan(k.service, k.env)), k)
}
}
})

t.Run("race", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500; i++ {
assert.NoError(ps.readRatesJSON(
ioutil.NopCloser(strings.NewReader(
`{
"rate_by_service":{
"service:,env:":0.8,
"service:obfuscate.http,env:none":0.9
}
}`,
)),
))
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500; i++ {
ps.getRate(mkSpan("obfuscate.http", "none"))
ps.getRate(mkSpan("other.service", "none"))
}
}()

wg.Wait()
})

t.Run("apply", func(t *testing.T) {
ps := newPrioritySampler()
assert := assert.New(t)
assert.NoError(ps.readRatesJSON(
ioutil.NopCloser(strings.NewReader(
`{
"rate_by_service":{
"service:obfuscate.http,env:":0.5,
"service:obfuscate.http,env:none":0.5
}
}`,
)),
))

testSpan1 := &ddSpan{
Name: "http.request",
Metrics: map[string]float64{},
}
testSpan1.Service = "obfuscate.http"
testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 4)

ps.applyPriority(testSpan1)
assert.EqualValues(ext.PriorityAutoKeep, testSpan1.Metrics[keySamplingPriority])
assert.EqualValues(0.5, testSpan1.Metrics[keySamplingPriorityRate])

testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 3)
ps.applyPriority(testSpan1)
assert.EqualValues(ext.PriorityAutoReject, testSpan1.Metrics[keySamplingPriority])
assert.EqualValues(0.5, testSpan1.Metrics[keySamplingPriorityRate])

testSpan1.Service = "other-service"
testSpan1.TraceID = 1
assert.EqualValues(ext.PriorityAutoReject, testSpan1.Metrics[keySamplingPriority])
assert.EqualValues(0.5, testSpan1.Metrics[keySamplingPriorityRate])
})
}
15 changes: 8 additions & 7 deletions span.go
Expand Up @@ -54,7 +54,7 @@ func (e *traceExporter) convertSpan(s *trace.SpanData) *ddSpan {
Service: e.opts.Service,
Start: startNano,
Duration: s.EndTime.UnixNano() - startNano,
Metrics: map[string]float64{samplingPriorityKey: ext.PriorityAutoKeep},
Metrics: map[string]float64{},
Meta: map[string]string{},
}
if s.ParentSpanID != (trace.SpanID{}) {
Expand All @@ -66,7 +66,7 @@ func (e *traceExporter) convertSpan(s *trace.SpanData) *ddSpan {
case trace.SpanKindServer:
span.Type = "server"
}
statusKey := statusDescriptionKey
statusKey := keyStatusDescription
if code := s.Status.Code; code != 0 {
statusKey = ext.ErrorMsg
span.Error = 1
Expand All @@ -85,9 +85,10 @@ func (e *traceExporter) convertSpan(s *trace.SpanData) *ddSpan {
}

const (
samplingPriorityKey = "_sampling_priority_v1"
statusDescriptionKey = "opencensus.status_description"
spanNameKey = "span.name"
keySamplingPriority = "_sampling_priority_v1"
keyStatusDescription = "opencensus.status_description"
keySpanName = "span.name"
keySamplingPriorityRate = "_sampling_priority_rate_v1"
gbbr marked this conversation as resolved.
Show resolved Hide resolved
)

func setTag(s *ddSpan, key string, val interface{}) {
Expand All @@ -107,7 +108,7 @@ func setTag(s *ddSpan, key string, val interface{}) {
}
case int64:
if key == ext.SamplingPriority {
s.Metrics[samplingPriorityKey] = float64(v)
s.Metrics[keySamplingPriority] = float64(v)
} else {
s.Metrics[key] = float64(v)
}
Expand All @@ -126,7 +127,7 @@ func setStringTag(s *ddSpan, key, v string) {
s.Resource = v
case ext.SpanType:
s.Type = v
case spanNameKey:
case keySpanName:
s.Name = v
default:
s.Meta[key] = v
Expand Down
29 changes: 11 additions & 18 deletions span_test.go
Expand Up @@ -53,15 +53,12 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
"int64": 1,
samplingPriorityKey: ext.PriorityAutoKeep,
},
Service: "my-service",
Metrics: map[string]float64{"int64": 1},
Service: "my-service",
Meta: map[string]string{
"bool": "true",
"str": "abc",
statusDescriptionKey: "status-msg",
keyStatusDescription: "status-msg",
},
},
},
Expand Down Expand Up @@ -89,11 +86,9 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
samplingPriorityKey: ext.PriorityAutoKeep,
},
Service: "my-service",
Meta: map[string]string{},
Metrics: map[string]float64{},
Service: "my-service",
Meta: map[string]string{},
},
},
"error": {
Expand Down Expand Up @@ -121,11 +116,9 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
samplingPriorityKey: ext.PriorityAutoKeep,
},
Error: 1,
Service: "my-service",
Metrics: map[string]float64{},
Error: 1,
Service: "my-service",
Meta: map[string]string{
ext.ErrorMsg: "status-msg",
ext.ErrorType: "cancelled",
Expand Down Expand Up @@ -161,7 +154,7 @@ var spanPairs = map[string]struct {
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
samplingPriorityKey: ext.PriorityUserReject,
keySamplingPriority: ext.PriorityUserReject,
},
Service: "other-service",
Error: 1,
Expand Down Expand Up @@ -280,7 +273,7 @@ func TestSetTag(t *testing.T) {
setTag(span, "key", int64(12))
eq(span.Metrics["key"], float64(12))
setTag(span, ext.SamplingPriority, int64(1))
eq(span.Metrics[samplingPriorityKey], float64(1))
eq(span.Metrics[keySamplingPriority], float64(1))
})

t.Run("default", func(t *testing.T) {
Expand Down