Skip to content

Commit

Permalink
ddtrace/tracer: priority sampling on by default (#373)
Browse files Browse the repository at this point in the history
This change enables priority sampling by default, as well as adds some slight performance improvements to sampling, additional tests and fixes a regression where extracted span weren't sampled.
  • Loading branch information
cgilmour authored and gbbr committed Dec 24, 2018
1 parent 5a81392 commit 7fb2bce
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 74 deletions.
12 changes: 5 additions & 7 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ type config struct {
// sampler specifies the sampler that will be used for sampling traces.
sampler Sampler

// prioritySampling will be non-nil when priority sampling is enabled.
prioritySampling *prioritySampler

// agentAddr specifies the hostname and of the agent where the traces
// are sent to.
agentAddr string
Expand Down Expand Up @@ -52,13 +49,14 @@ func defaults(c *config) {
c.agentAddr = defaultAddress
}

// WithPrioritySampling enables priority sampling on the active tracer instance. When using
// distributed tracing, this option must be enabled in order to get all the parts of a distributed
// trace sampled. To learn more about priority sampling, please visit:
// WithPrioritySampling is deprecated, and priority sampling is enabled by default.
// When using distributed tracing, the priority sampling value is propagated in order to
// get all the parts of a distributed trace sampled.
// To learn more about priority sampling, please visit:
// https://docs.datadoghq.com/tracing/getting_further/trace_sampling_and_storage/#priority-sampling-for-distributed-tracing
func WithPrioritySampling() StartOption {
return func(c *config) {
c.prioritySampling = newPrioritySampler()
// This is now enabled by default.
}
}

Expand Down
4 changes: 4 additions & 0 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const knuthFactor = uint64(1111111111111111111)

// Sample returns true if the given span should be sampled.
func (r *rateSampler) Sample(spn ddtrace.Span) bool {
if r.rate == 1 {
// fast path
return true
}
s, ok := spn.(*span)
if !ok {
return false
Expand Down
15 changes: 1 addition & 14 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,7 @@ func TestRateSampler(t *testing.T) {
assert.True(NewRateSampler(1).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(0).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(0).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(1).Sample(internal.NoopSpan{}))
}

func TestRateSamplerFinishedSpan(t *testing.T) {
rs := NewRateSampler(0.9999)
tracer := newTracer(WithSampler(rs)) // high probability of sampling
span := newBasicSpan("test")
span.finished = true
tracer.sample(span)
if !rs.Sample(span) {
t.Skip("wasn't sampled") // no flaky tests
}
_, ok := span.Metrics[sampleRateMetricKey]
assert.False(t, ok)
assert.False(NewRateSampler(0.99).Sample(internal.NoopSpan{}))
}

func TestRateSamplerSetting(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true

if !s.context.sampled {
// not sampled
if s.context.drop {
// not sampled by local sampler
return
}
s.context.finish()
Expand Down
15 changes: 11 additions & 4 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,19 @@ func TestSpanSetMetric(t *testing.T) {

// check the map is properly initialized
span.SetTag("bytes", 1024.42)
assert.Equal(1, len(span.Metrics))
assert.Equal(3, len(span.Metrics))
assert.Equal(1024.42, span.Metrics["bytes"])
_, ok := span.Metrics[samplingPriorityKey]
assert.True(ok)
_, ok = span.Metrics[samplingPriorityRateKey]
assert.True(ok)

// operating on a finished span is a no-op
span.Finish()
span.SetTag("finished.test", 1337)
assert.Equal(1, len(span.Metrics))
assert.Equal(0.0, span.Metrics["finished.test"])
assert.Equal(3, len(span.Metrics))
_, ok = span.Metrics["finished.test"]
assert.False(ok)
}

func TestSpanError(t *testing.T) {
Expand Down Expand Up @@ -297,7 +302,9 @@ func TestSpanSamplingPriority(t *testing.T) {

span := tracer.newRootSpan("my.name", "my.service", "my.resource")
_, ok := span.Metrics[samplingPriorityKey]
assert.False(ok)
assert.True(ok)
_, ok = span.Metrics[samplingPriorityRateKey]
assert.True(ok)

for _, priority := range []int{
ext.PriorityUserReject,
Expand Down
9 changes: 4 additions & 5 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ var _ ddtrace.SpanContext = (*spanContext)(nil)
type spanContext struct {
// the below group should propagate only locally

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
sampled bool // whether this span will be sampled or not
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // when true, the span will not be sent to the agent

// the below group should propagate cross-process

Expand All @@ -40,7 +40,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
context := &spanContext{
traceID: span.TraceID,
spanID: span.SpanID,
sampled: true,
span: span,
}
if v, ok := span.Metrics[samplingPriorityKey]; ok {
Expand All @@ -49,7 +48,7 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
}
if parent != nil {
context.trace = parent.trace
context.sampled = parent.sampled
context.drop = parent.drop
context.hasPriority = parent.hasSamplingPriority()
context.priority = parent.samplingPriority()
parent.ForeachBaggageItem(func(k, v string) bool {
Expand Down
7 changes: 3 additions & 4 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,14 @@ func TestSpanContextParent(t *testing.T) {
}
for name, parentCtx := range map[string]*spanContext{
"basic": &spanContext{
sampled: false,
baggage: map[string]string{"A": "A", "B": "B"},
trace: newTrace(),
drop: true,
},
"nil-trace": &spanContext{
sampled: false,
drop: true,
},
"priority": &spanContext{
sampled: true,
baggage: map[string]string{"A": "A", "B": "B"},
trace: &trace{spans: []*span{newBasicSpan("abc")}},
hasPriority: true,
Expand All @@ -221,7 +220,7 @@ func TestSpanContextParent(t *testing.T) {
assert.Contains(ctx.trace.spans, s)
assert.Equal(ctx.hasPriority, parentCtx.hasPriority)
assert.Equal(ctx.priority, parentCtx.priority)
assert.Equal(ctx.sampled, parentCtx.sampled)
assert.Equal(ctx.drop, parentCtx.drop)
assert.Equal(ctx.baggage, parentCtx.baggage)
})
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
return ErrSpanContextCorrupted
}
case p.cfg.PriorityHeader:
ctx.priority, err = strconv.Atoi(v)
priority, err := strconv.Atoi(v)
if err != nil {
return ErrSpanContextCorrupted
}
ctx.hasPriority = true
ctx.setSamplingPriority(priority)
default:
if strings.HasPrefix(key, p.cfg.BaggagePrefix) {
ctx.setBaggageItem(strings.TrimPrefix(key, p.cfg.BaggagePrefix), v)
Expand Down
47 changes: 22 additions & 25 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type tracer struct {
// a synchronous (blocking) operation, meaning that it will only return after
// the trace has been fully processed and added onto the payload.
syncPush chan struct{}

// prioritySampling holds an instance of the priority sampler.
prioritySampling *prioritySampler
}

const (
Expand Down Expand Up @@ -118,15 +121,16 @@ func newTracer(opts ...StartOption) *tracer {
c.propagator = NewPropagator(nil)
}
t := &tracer{
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
prioritySampling: newPrioritySampler(),
}

go t.worker()
Expand Down Expand Up @@ -307,8 +311,8 @@ func (t *tracer) flushTraces() {
if err != nil {
t.pushError(&dataLossError{context: err, count: count})
}
if err == nil && t.config.prioritySampling != nil {
t.config.prioritySampling.readRatesJSON(rc) // TODO: handle error?
if err == nil {
t.prioritySampling.readRatesJSON(rc) // TODO: handle error?
}
t.payload.reset()
}
Expand Down Expand Up @@ -357,24 +361,17 @@ const sampleRateMetricKey = "_sample_rate"

// Sample samples a span with the internal sampler.
func (t *tracer) sample(span *span) {
if span.context.hasPriority {
// sampling decision was already made
return
}
sampler := t.config.sampler
sampled := sampler.Sample(span)
span.context.sampled = sampled
if !sampled {
if !sampler.Sample(span) {
span.context.drop = true
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
// the span was sampled using a rate sampler which wasn't all permissive,
// so we make note of the sampling rate.
span.Lock()
defer span.Unlock()
if span.finished {
// we don't touch finished span as they might be flushing
return
}
span.Metrics[sampleRateMetricKey] = rs.Rate()
}
if t.config.prioritySampling != nil {
t.config.prioritySampling.apply(span)
}
t.prioritySampling.apply(span)
}
79 changes: 68 additions & 11 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestTracerCleanStop(t *testing.T) {
for i := 0; i < n; i++ {
Start(withTransport(transport))
time.Sleep(time.Millisecond)
Start(withTransport(transport))
Start(withTransport(transport))
Start(withTransport(transport), WithSampler(NewRateSampler(0.99)))
Start(withTransport(transport), WithSampler(NewRateSampler(0.99)))
}
}()

Expand Down Expand Up @@ -153,14 +153,26 @@ func TestTracerStart(t *testing.T) {
}

func TestTracerStartSpan(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
assert := assert.New(t)
assert.NotEqual(uint64(0), span.TraceID)
assert.NotEqual(uint64(0), span.SpanID)
assert.Equal(uint64(0), span.ParentID)
assert.Equal("web.request", span.Name)
assert.Equal("tracer.test", span.Service)
t.Run("generic", func(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
assert := assert.New(t)
assert.NotEqual(uint64(0), span.TraceID)
assert.NotEqual(uint64(0), span.SpanID)
assert.Equal(uint64(0), span.ParentID)
assert.Equal("web.request", span.Name)
assert.Equal("tracer.test", span.Service)
assert.Contains([]float64{
ext.PriorityAutoReject,
ext.PriorityAutoKeep,
}, span.Metrics[samplingPriorityKey])
})

t.Run("priority", func(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request", Tag(ext.SamplingPriority, ext.PriorityUserKeep)).(*span)
assert.Equal(t, float64(ext.PriorityUserKeep), span.Metrics[samplingPriorityKey])
})
}

func TestTracerStartSpanOptions(t *testing.T) {
Expand Down Expand Up @@ -386,7 +398,6 @@ func TestTracerPrioritySampler(t *testing.T) {

tr, _, stop := startTestTracer(
withTransport(newHTTPTransport(addr, defaultRoundTripper)),
WithPrioritySampling(),
)
defer stop()

Expand Down Expand Up @@ -856,6 +867,46 @@ func TestPushErr(t *testing.T) {
// if we reach this, means pushError is not blocking, which is what we want to double-check
}

func TestTracerFlush(t *testing.T) {
// https://github.com/DataDog/dd-trace-go/issues/377
tracer, transport, stop := startTestTracer()
defer stop()

t.Run("direct", func(t *testing.T) {
defer transport.Reset()
assert := assert.New(t)
root := tracer.StartSpan("root")
tracer.StartSpan("child.direct", ChildOf(root.Context())).Finish()
root.Finish()
tracer.forceFlush()

list := transport.Traces()
assert.Len(list, 1)
assert.Len(list[0], 2)
assert.Equal("child.direct", list[0][1].Name)
})

t.Run("extracted", func(t *testing.T) {
defer transport.Reset()
assert := assert.New(t)
root := tracer.StartSpan("root")
h := HTTPHeadersCarrier(http.Header{})
if err := tracer.Inject(root.Context(), h); err != nil {
t.Fatal(err)
}
sctx, err := tracer.Extract(h)
if err != nil {
t.Fatal(err)
}
tracer.StartSpan("child.extracted", ChildOf(sctx)).Finish()
tracer.forceFlush()
list := transport.Traces()
assert.Len(list, 1)
assert.Len(list[0], 1)
assert.Equal("child.extracted", list[0][0].Name)
})
}

// BenchmarkConcurrentTracing tests the performance of spawning a lot of
// goroutines where each one creates a trace with a parent and a child.
func BenchmarkConcurrentTracing(b *testing.B) {
Expand Down Expand Up @@ -938,6 +989,12 @@ func encode(traces [][]*span) (*payload, error) {
return p, nil
}

func (t *dummyTransport) Reset() {
t.Lock()
t.traces = t.traces[:0]
t.Unlock()
}

func (t *dummyTransport) Traces() spanLists {
t.Lock()
defer t.Unlock()
Expand Down

0 comments on commit 7fb2bce

Please sign in to comment.