Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: priority sampling on by default #373

Merged
merged 6 commits into from
Dec 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ type config struct {
// sampler specifies the sampler that will be used for sampling traces.
sampler Sampler

// prioritySampling will be non-nil when priority sampling is enabled.
prioritySampling *prioritySampler

// agentAddr specifies the hostname and of the agent where the traces
// are sent to.
agentAddr string
Expand Down Expand Up @@ -52,13 +49,14 @@ func defaults(c *config) {
c.agentAddr = defaultAddress
}

// WithPrioritySampling enables priority sampling on the active tracer instance. When using
// distributed tracing, this option must be enabled in order to get all the parts of a distributed
// trace sampled. To learn more about priority sampling, please visit:
// WithPrioritySampling is deprecated, and priority sampling is enabled by default.
// When using distributed tracing, the priority sampling value is propagated in order to
// get all the parts of a distributed trace sampled.
// To learn more about priority sampling, please visit:
// https://docs.datadoghq.com/tracing/getting_further/trace_sampling_and_storage/#priority-sampling-for-distributed-tracing
func WithPrioritySampling() StartOption {
return func(c *config) {
c.prioritySampling = newPrioritySampler()
// This is now enabled by default.
}
}

Expand Down
4 changes: 4 additions & 0 deletions ddtrace/tracer/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const knuthFactor = uint64(1111111111111111111)

// Sample returns true if the given span should be sampled.
func (r *rateSampler) Sample(spn ddtrace.Span) bool {
if r.rate == 1 {
// fast path
return true
}
s, ok := spn.(*span)
if !ok {
return false
Expand Down
15 changes: 1 addition & 14 deletions ddtrace/tracer/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,7 @@ func TestRateSampler(t *testing.T) {
assert.True(NewRateSampler(1).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(0).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(0).Sample(newBasicSpan("test")))
assert.False(NewRateSampler(1).Sample(internal.NoopSpan{}))
}

func TestRateSamplerFinishedSpan(t *testing.T) {
rs := NewRateSampler(0.9999)
tracer := newTracer(WithSampler(rs)) // high probability of sampling
span := newBasicSpan("test")
span.finished = true
tracer.sample(span)
if !rs.Sample(span) {
t.Skip("wasn't sampled") // no flaky tests
}
_, ok := span.Metrics[sampleRateMetricKey]
assert.False(t, ok)
assert.False(NewRateSampler(0.99).Sample(internal.NoopSpan{}))
}

func TestRateSamplerSetting(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true

if !s.context.sampled {
// not sampled
if s.context.drop {
// not sampled by local sampler
return
}
s.context.finish()
Expand Down
15 changes: 11 additions & 4 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,19 @@ func TestSpanSetMetric(t *testing.T) {

// check the map is properly initialized
span.SetTag("bytes", 1024.42)
assert.Equal(1, len(span.Metrics))
assert.Equal(3, len(span.Metrics))
assert.Equal(1024.42, span.Metrics["bytes"])
_, ok := span.Metrics[samplingPriorityKey]
assert.True(ok)
_, ok = span.Metrics[samplingPriorityRateKey]
assert.True(ok)

// operating on a finished span is a no-op
span.Finish()
span.SetTag("finished.test", 1337)
assert.Equal(1, len(span.Metrics))
assert.Equal(0.0, span.Metrics["finished.test"])
assert.Equal(3, len(span.Metrics))
_, ok = span.Metrics["finished.test"]
assert.False(ok)
}

func TestSpanError(t *testing.T) {
Expand Down Expand Up @@ -297,7 +302,9 @@ func TestSpanSamplingPriority(t *testing.T) {

span := tracer.newRootSpan("my.name", "my.service", "my.resource")
_, ok := span.Metrics[samplingPriorityKey]
assert.False(ok)
assert.True(ok)
_, ok = span.Metrics[samplingPriorityRateKey]
assert.True(ok)

for _, priority := range []int{
ext.PriorityUserReject,
Expand Down
9 changes: 4 additions & 5 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ var _ ddtrace.SpanContext = (*spanContext)(nil)
type spanContext struct {
// the below group should propagate only locally

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
sampled bool // whether this span will be sampled or not
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // when true, the span will not be sent to the agent

// the below group should propagate cross-process

Expand All @@ -40,7 +40,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
context := &spanContext{
traceID: span.TraceID,
spanID: span.SpanID,
sampled: true,
span: span,
}
if v, ok := span.Metrics[samplingPriorityKey]; ok {
Expand All @@ -49,7 +48,7 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
}
if parent != nil {
context.trace = parent.trace
context.sampled = parent.sampled
context.drop = parent.drop
context.hasPriority = parent.hasSamplingPriority()
context.priority = parent.samplingPriority()
parent.ForeachBaggageItem(func(k, v string) bool {
Expand Down
7 changes: 3 additions & 4 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,14 @@ func TestSpanContextParent(t *testing.T) {
}
for name, parentCtx := range map[string]*spanContext{
"basic": &spanContext{
sampled: false,
baggage: map[string]string{"A": "A", "B": "B"},
trace: newTrace(),
drop: true,
},
"nil-trace": &spanContext{
sampled: false,
drop: true,
},
"priority": &spanContext{
sampled: true,
baggage: map[string]string{"A": "A", "B": "B"},
trace: &trace{spans: []*span{newBasicSpan("abc")}},
hasPriority: true,
Expand All @@ -221,7 +220,7 @@ func TestSpanContextParent(t *testing.T) {
assert.Contains(ctx.trace.spans, s)
assert.Equal(ctx.hasPriority, parentCtx.hasPriority)
assert.Equal(ctx.priority, parentCtx.priority)
assert.Equal(ctx.sampled, parentCtx.sampled)
assert.Equal(ctx.drop, parentCtx.drop)
assert.Equal(ctx.baggage, parentCtx.baggage)
})
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
return ErrSpanContextCorrupted
}
case p.cfg.PriorityHeader:
ctx.priority, err = strconv.Atoi(v)
priority, err := strconv.Atoi(v)
if err != nil {
return ErrSpanContextCorrupted
}
ctx.hasPriority = true
ctx.setSamplingPriority(priority)
default:
if strings.HasPrefix(key, p.cfg.BaggagePrefix) {
ctx.setBaggageItem(strings.TrimPrefix(key, p.cfg.BaggagePrefix), v)
Expand Down
47 changes: 22 additions & 25 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type tracer struct {
// a synchronous (blocking) operation, meaning that it will only return after
// the trace has been fully processed and added onto the payload.
syncPush chan struct{}

// prioritySampling holds an instance of the priority sampler.
prioritySampling *prioritySampler
}

const (
Expand Down Expand Up @@ -118,15 +121,16 @@ func newTracer(opts ...StartOption) *tracer {
c.propagator = NewPropagator(nil)
}
t := &tracer{
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
prioritySampling: newPrioritySampler(),
}

go t.worker()
Expand Down Expand Up @@ -307,8 +311,8 @@ func (t *tracer) flushTraces() {
if err != nil {
t.pushError(&dataLossError{context: err, count: count})
}
if err == nil && t.config.prioritySampling != nil {
t.config.prioritySampling.readRatesJSON(rc) // TODO: handle error?
if err == nil {
t.prioritySampling.readRatesJSON(rc) // TODO: handle error?
}
t.payload.reset()
}
Expand Down Expand Up @@ -357,24 +361,17 @@ const sampleRateMetricKey = "_sample_rate"

// Sample samples a span with the internal sampler.
func (t *tracer) sample(span *span) {
if span.context.hasPriority {
// sampling decision was already made
return
}
sampler := t.config.sampler
sampled := sampler.Sample(span)
span.context.sampled = sampled
if !sampled {
if !sampler.Sample(span) {
span.context.drop = true
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
// the span was sampled using a rate sampler which wasn't all permissive,
Copy link
Contributor

@gbbr gbbr Dec 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock and check is not needed since sample is called only for new spans, where the only shared structure is the trace in the context which has its own locks. Having it here created a deadlock between this lock and the lock used when setting a tag in the priority sampler below.

// so we make note of the sampling rate.
span.Lock()
defer span.Unlock()
if span.finished {
// we don't touch finished span as they might be flushing
return
}
span.Metrics[sampleRateMetricKey] = rs.Rate()
}
if t.config.prioritySampling != nil {
t.config.prioritySampling.apply(span)
}
t.prioritySampling.apply(span)
}
79 changes: 68 additions & 11 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestTracerCleanStop(t *testing.T) {
for i := 0; i < n; i++ {
Start(withTransport(transport))
time.Sleep(time.Millisecond)
Start(withTransport(transport))
Start(withTransport(transport))
Start(withTransport(transport), WithSampler(NewRateSampler(0.99)))
Start(withTransport(transport), WithSampler(NewRateSampler(0.99)))
}
}()

Expand Down Expand Up @@ -153,14 +153,26 @@ func TestTracerStart(t *testing.T) {
}

func TestTracerStartSpan(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
assert := assert.New(t)
assert.NotEqual(uint64(0), span.TraceID)
assert.NotEqual(uint64(0), span.SpanID)
assert.Equal(uint64(0), span.ParentID)
assert.Equal("web.request", span.Name)
assert.Equal("tracer.test", span.Service)
t.Run("generic", func(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request").(*span)
assert := assert.New(t)
assert.NotEqual(uint64(0), span.TraceID)
assert.NotEqual(uint64(0), span.SpanID)
assert.Equal(uint64(0), span.ParentID)
assert.Equal("web.request", span.Name)
assert.Equal("tracer.test", span.Service)
assert.Contains([]float64{
ext.PriorityAutoReject,
ext.PriorityAutoKeep,
}, span.Metrics[samplingPriorityKey])
})

t.Run("priority", func(t *testing.T) {
tracer := newTracer()
span := tracer.StartSpan("web.request", Tag(ext.SamplingPriority, ext.PriorityUserKeep)).(*span)
assert.Equal(t, float64(ext.PriorityUserKeep), span.Metrics[samplingPriorityKey])
})
}

func TestTracerStartSpanOptions(t *testing.T) {
Expand Down Expand Up @@ -386,7 +398,6 @@ func TestTracerPrioritySampler(t *testing.T) {

tr, _, stop := startTestTracer(
withTransport(newHTTPTransport(addr, defaultRoundTripper)),
WithPrioritySampling(),
)
defer stop()

Expand Down Expand Up @@ -856,6 +867,46 @@ func TestPushErr(t *testing.T) {
// if we reach this, means pushError is not blocking, which is what we want to double-check
}

func TestTracerFlush(t *testing.T) {
// https://github.com/DataDog/dd-trace-go/issues/377
tracer, transport, stop := startTestTracer()
defer stop()

t.Run("direct", func(t *testing.T) {
defer transport.Reset()
assert := assert.New(t)
root := tracer.StartSpan("root")
tracer.StartSpan("child.direct", ChildOf(root.Context())).Finish()
root.Finish()
tracer.forceFlush()

list := transport.Traces()
assert.Len(list, 1)
assert.Len(list[0], 2)
assert.Equal("child.direct", list[0][1].Name)
})

t.Run("extracted", func(t *testing.T) {
defer transport.Reset()
assert := assert.New(t)
root := tracer.StartSpan("root")
h := HTTPHeadersCarrier(http.Header{})
if err := tracer.Inject(root.Context(), h); err != nil {
t.Fatal(err)
}
sctx, err := tracer.Extract(h)
if err != nil {
t.Fatal(err)
}
tracer.StartSpan("child.extracted", ChildOf(sctx)).Finish()
tracer.forceFlush()
list := transport.Traces()
assert.Len(list, 1)
assert.Len(list[0], 1)
assert.Equal("child.extracted", list[0][0].Name)
})
}

// BenchmarkConcurrentTracing tests the performance of spawning a lot of
// goroutines where each one creates a trace with a parent and a child.
func BenchmarkConcurrentTracing(b *testing.B) {
Expand Down Expand Up @@ -938,6 +989,12 @@ func encode(traces [][]*span) (*payload, error) {
return p, nil
}

func (t *dummyTransport) Reset() {
t.Lock()
t.traces = t.traces[:0]
t.Unlock()
}

func (t *dummyTransport) Traces() spanLists {
t.Lock()
defer t.Unlock()
Expand Down