Skip to content

Commit

Permalink
ddtrace/tracer: switch to go.uber.org/atomic
Browse files Browse the repository at this point in the history
sync/atomic has several issues. Among them is that it causes a panic when a
field isn't correctly aligned. Alignment must me manually ensured and is
easy to forget.

Instead, we will use go.uber.org/atomic which provides wrappers around Go's
standard library atomics. The wrappers ensure we always use atomic
operations to load and store the values, and also prevent the alignment
issue.
  • Loading branch information
knusbaum committed Aug 23, 2022
1 parent 2bc6d1b commit db61ba1
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 76 deletions.
7 changes: 3 additions & 4 deletions ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package tracer
import (
"runtime"
"runtime/debug"
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -99,9 +98,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) {
for {
select {
case <-ticker.C:
t.config.statsd.Count("datadog.tracer.spans_started", atomic.SwapInt64(&t.spansStarted, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", atomic.SwapInt64(&t.spansFinished, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", atomic.SwapInt64(&t.tracesDropped, 0), []string{"reason:trace_too_large"}, 1)
t.config.statsd.Count("datadog.tracer.spans_started", t.spansStarted.Swap(0), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", t.spansFinished.Swap(0), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", t.tracesDropped.Swap(0), []string{"reason:trace_too_large"}, 1)
case <-t.stop:
return
}
Expand Down
11 changes: 6 additions & 5 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"bytes"
"encoding/binary"
"io"
"sync/atomic"

"github.com/tinylib/msgp/msgp"
uatomic "go.uber.org/atomic"
)

// payload is a wrapper on top of the msgpack encoder which allows constructing an
Expand All @@ -36,7 +36,7 @@ type payload struct {
off int

// count specifies the number of items in the stream.
count uint64
count uatomic.Uint64

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer
Expand All @@ -58,14 +58,15 @@ func (p *payload) push(t spanList) error {
if err := msgp.Encode(&p.buf, t); err != nil {
return err
}
atomic.AddUint64(&p.count, 1)
//atomic.AddUint64(&p.count, 1)
p.count.Inc()
p.updateHeader()
return nil
}

// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
return int(atomic.LoadUint64(&p.count))
return int(p.count.Load())
}

// size returns the payload size in bytes. After the first read the value becomes
Expand Down Expand Up @@ -102,7 +103,7 @@ const (
// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
n := atomic.LoadUint64(&p.count)
n := p.count.Load()
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func benchmarkPayloadThroughput(count int) func(*testing.B) {
reset := func() {
p.header = make([]byte, 8)
p.off = 8
p.count = 0
p.count.Store(0)
p.buf.Reset()
}
for i := 0; i < b.N; i++ {
Expand Down
9 changes: 5 additions & 4 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
Expand Down Expand Up @@ -221,13 +220,15 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) {
if yes {
if s.Error == 0 {
// new error
atomic.AddInt64(&s.context.errors, 1)
//atomic.AddInt64(&s.context.errors, 1)
s.context.errors.Inc()
}
s.Error = 1
} else {
if s.Error > 0 {
// flip from active to inactive
atomic.AddInt64(&s.context.errors, -1)
//atomic.AddInt64(&s.context.errors, -1)
s.context.errors.Dec()
}
s.Error = 0
}
Expand Down Expand Up @@ -518,7 +519,7 @@ func shouldKeep(s *span) bool {
// positive sampling priorities stay
return true
}
if atomic.LoadInt64(&s.context.errors) > 0 {
if s.context.errors.Load() > 0 {
// traces with any span containing an error get kept
return true
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"math"
"os"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -122,7 +121,8 @@ func TestShouldDrop(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
s.SetTag(ext.SamplingPriority, tt.prio)
s.SetTag(ext.EventSampleRate, tt.rate)
atomic.StoreInt64(&s.context.errors, tt.errors)
//atomic.StoreInt64(&s.context.errors, tt.errors)
s.context.errors.Store(tt.errors)
assert.Equal(t, shouldKeep(s), tt.want)
})
}
Expand Down
42 changes: 25 additions & 17 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ package tracer
import (
"strconv"
"sync"
"sync/atomic"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

uatomic "go.uber.org/atomic"
)

var _ ddtrace.SpanContext = (*spanContext)(nil)
Expand All @@ -25,9 +26,9 @@ var _ ddtrace.SpanContext = (*spanContext)(nil)
type spanContext struct {
// the below group should propagate only locally

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
errors int64 // number of spans with errors in this trace
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
errors uatomic.Int64 // number of spans with errors in this trace

// the below group should propagate cross-process

Expand All @@ -36,8 +37,8 @@ type spanContext struct {

mu sync.RWMutex // guards below fields
baggage map[string]string
hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
origin string // e.g. "synthetics"
hasBaggage uatomic.Int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
origin string // e.g. "synthetics"
}

// newSpanContext creates a new SpanContext to serve as context for the given
Expand Down Expand Up @@ -80,7 +81,7 @@ func (c *spanContext) TraceID() uint64 { return c.traceID }

// ForeachBaggageItem implements ddtrace.SpanContext.
func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if c.hasBaggage.Load() == 0 {
return
}
c.mu.RLock()
Expand Down Expand Up @@ -110,14 +111,14 @@ func (c *spanContext) setBaggageItem(key, val string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.baggage == nil {
atomic.StoreInt32(&c.hasBaggage, 1)
c.hasBaggage.Store(1)
c.baggage = make(map[string]string, 1)
}
c.baggage[key] = val
}

func (c *spanContext) baggageItem(key string) string {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if c.hasBaggage.Load() == 0 {
return ""
}
c.mu.RLock()
Expand All @@ -136,12 +137,14 @@ func (c *spanContext) meta(key string) (val string, ok bool) {
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// samplingDecision is the decision to send a trace to the agent or not.
type samplingDecision int64
type samplingDecision struct {
uatomic.Int64
}

const (
// decisionNone is the default state of a trace.
// If no decision is made about the trace, the trace won't be sent to the agent.
decisionNone samplingDecision = iota
decisionNone int64 = iota
// decisionDrop prevents the trace from being sent to the agent.
decisionDrop
// decisionKeep ensures the trace will be sent to the agent.
Expand Down Expand Up @@ -208,11 +211,11 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName, rat
}

func (t *trace) keep() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep))
t.samplingDecision.CompareAndSwap(decisionNone, decisionKeep)
}

func (t *trace) drop() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop))
t.samplingDecision.CompareAndSwap(decisionNone, decisionDrop)
}

func (t *trace) setTag(key, value string) {
Expand Down Expand Up @@ -279,7 +282,8 @@ func (t *trace) push(sp *span) {
t.spans = nil // GC
log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
if haveTracer {
atomic.AddInt64(&tr.tracesDropped, 1)
//atomic.AddInt64(&tr.tracesDropped, 1)
tr.tracesDropped.Inc()
}
return
}
Expand All @@ -288,7 +292,8 @@ func (t *trace) push(sp *span) {
}
t.spans = append(t.spans, sp)
if haveTracer {
atomic.AddInt64(&tr.spansStarted, 1)
//atomic.AddInt64(&tr.spansStarted, 1)
tr.spansStarted.Inc()
}
}

Expand Down Expand Up @@ -338,9 +343,12 @@ func (t *trace) finishedOne(s *span) {
return
}
// we have a tracer that can receive completed traces.
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
//atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
tr.spansFinished.Add(int64(len(t.spans)))
var decision samplingDecision
decision.Store(t.samplingDecision.Load())
tr.pushTrace(&finishedTrace{
spans: t.spans,
decision: samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))),
decision: decision,
})
}
17 changes: 9 additions & 8 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

"github.com/stretchr/testify/assert"
uatomic "go.uber.org/atomic"
)

func setupteardown(start, max int) func() {
Expand Down Expand Up @@ -305,24 +306,24 @@ func TestSpanContextParent(t *testing.T) {
for name, parentCtx := range map[string]*spanContext{
"basic": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
hasBaggage: *uatomic.NewInt32(1),
trace: newTrace(),
},
"nil-trace": &spanContext{},
"priority": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
hasBaggage: *uatomic.NewInt32(1),
trace: &trace{
spans: []*span{newBasicSpan("abc")},
priority: func() *float64 { v := new(float64); *v = 2; return v }(),
},
},
"sampling_decision": &spanContext{
baggage: map[string]string{"A": "A", "B": "B"},
hasBaggage: 1,
hasBaggage: *uatomic.NewInt32(1),
trace: &trace{
spans: []*span{newBasicSpan("abc")},
samplingDecision: decisionKeep,
samplingDecision: samplingDecision{*uatomic.NewInt64(decisionKeep)},
},
},
"origin": &spanContext{
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestSpanContextIterator(t *testing.T) {
assert := assert.New(t)

got := make(map[string]string)
ctx := spanContext{baggage: map[string]string{"key": "value"}, hasBaggage: 1}
ctx := spanContext{baggage: map[string]string{"key": "value"}, hasBaggage: *uatomic.NewInt32(1)}
ctx.ForeachBaggageItem(func(k, v string) bool {
got[k] = v
return true
Expand Down Expand Up @@ -434,7 +435,7 @@ func (tp *testLogger) Reset() {
}

func BenchmarkBaggageItemPresent(b *testing.B) {
ctx := spanContext{baggage: map[string]string{"key": "value"}, hasBaggage: 1}
ctx := spanContext{baggage: map[string]string{"key": "value"}, hasBaggage: *uatomic.NewInt32(1)}
for n := 0; n < b.N; n++ {
ctx.ForeachBaggageItem(func(k, v string) bool {
return true
Expand Down
12 changes: 7 additions & 5 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ package tracer

import (
"sync"
"sync/atomic"
"time"

uatomic "go.uber.org/atomic"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"

"github.com/DataDog/datadog-go/v5/statsd"
Expand Down Expand Up @@ -52,7 +52,7 @@ type concentrator struct {
buckets map[int64]*rawBucket

// stopped reports whether the concentrator is stopped (when non-zero)
stopped uint64
stopped uatomic.Uint64

wg sync.WaitGroup // waits for any active goroutines
bucketSize int64 // the size of a bucket in nanoseconds
Expand All @@ -63,10 +63,12 @@ type concentrator struct {
// newConcentrator creates a new concentrator using the given tracer
// configuration c. It creates buckets of bucketSize nanoseconds duration.
func newConcentrator(c *config, bucketSize int64) *concentrator {
var stopped uatomic.Uint64
stopped.Store(1)
return &concentrator{
In: make(chan *aggregableSpan, 10000),
bucketSize: bucketSize,
stopped: 1,
stopped: stopped,
buckets: make(map[int64]*rawBucket),
cfg: c,
}
Expand All @@ -79,7 +81,7 @@ func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }
// Start starts the concentrator. A started concentrator needs to be stopped
// in order to gracefully shut down, using Stop.
func (c *concentrator) Start() {
if atomic.SwapUint64(&c.stopped, 0) == 0 {
if c.stopped.Swap(0) == 0 {
// already running
log.Warn("(*concentrator).Start called more than once. This is likely a programming error.")
return
Expand Down Expand Up @@ -149,7 +151,7 @@ func (c *concentrator) add(s *aggregableSpan) {

// Stop stops the concentrator and blocks until the operation completes.
func (c *concentrator) Stop() {
if atomic.SwapUint64(&c.stopped, 1) > 0 {
if c.stopped.Swap(1) > 0 {
return
}
close(c.stop)
Expand Down

0 comments on commit db61ba1

Please sign in to comment.