Skip to content

Commit

Permalink
ddtrace/tracer: switch atomics to 32-bit (#1443)
Browse files Browse the repository at this point in the history
sync/atomic has several issues. Among them is that it causes a panic when
a 64-bit field isn't correctly aligned. Alignment must be manually
ensured and is easy to forget.

Instead, we will use 32-bit atomic integers which do not require manual
alignment. We can eventually trade them out for Go's new atomics APIs
that were introduced in go1.19, but we have to wait until 1.18 falls out
of our supported versions.

Fixes #1418
  • Loading branch information
knusbaum committed Sep 15, 2022
1 parent 95c8e0b commit 3e927ae
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 48 deletions.
6 changes: 3 additions & 3 deletions ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) {
for {
select {
case <-ticker.C:
t.config.statsd.Count("datadog.tracer.spans_started", atomic.SwapInt64(&t.spansStarted, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", atomic.SwapInt64(&t.spansFinished, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", atomic.SwapInt64(&t.tracesDropped, 0), []string{"reason:trace_too_large"}, 1)
t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1)
case <-t.stop:
return
}
Expand Down
8 changes: 4 additions & 4 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type payload struct {
off int

// count specifies the number of items in the stream.
count uint64
count uint32

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer
Expand All @@ -58,14 +58,14 @@ func (p *payload) push(t spanList) error {
if err := msgp.Encode(&p.buf, t); err != nil {
return err
}
atomic.AddUint64(&p.count, 1)
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}

// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
return int(atomic.LoadUint64(&p.count))
return int(atomic.LoadUint32(&p.count))
}

// size returns the payload size in bytes. After the first read the value becomes
Expand Down Expand Up @@ -102,7 +102,7 @@ const (
// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
n := atomic.LoadUint64(&p.count)
n := uint64(atomic.LoadUint32(&p.count))
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"strconv"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,7 +97,7 @@ func benchmarkPayloadThroughput(count int) func(*testing.B) {
reset := func() {
p.header = make([]byte, 8)
p.off = 8
p.count = 0
atomic.StoreUint32(&p.count, 0)
p.buf.Reset()
}
for i := 0; i < b.N; i++ {
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) {
if yes {
if s.Error == 0 {
// new error
atomic.AddInt64(&s.context.errors, 1)
atomic.AddInt32(&s.context.errors, 1)
}
s.Error = 1
} else {
if s.Error > 0 {
// flip from active to inactive
atomic.AddInt64(&s.context.errors, -1)
atomic.AddInt32(&s.context.errors, -1)
}
s.Error = 0
}
Expand Down Expand Up @@ -518,7 +518,7 @@ func shouldKeep(s *span) bool {
// positive sampling priorities stay
return true
}
if atomic.LoadInt64(&s.context.errors) > 0 {
if atomic.LoadInt32(&s.context.errors) > 0 {
// traces with any span containing an error get kept
return true
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSpanFinishTwice(t *testing.T) {
func TestShouldDrop(t *testing.T) {
for _, tt := range []struct {
prio int
errors int64
errors int32
rate float64
want bool
}{
Expand All @@ -122,7 +122,7 @@ func TestShouldDrop(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
s.SetTag(ext.SamplingPriority, tt.prio)
s.SetTag(ext.EventSampleRate, tt.rate)
atomic.StoreInt64(&s.context.errors, tt.errors)
atomic.StoreInt32(&s.context.errors, tt.errors)
assert.Equal(t, shouldKeep(s), tt.want)
})
}
Expand Down
24 changes: 12 additions & 12 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type spanContext struct {

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
errors int64 // number of spans with errors in this trace
errors int32 // number of spans with errors in this trace

// the below group should propagate cross-process

Expand All @@ -36,7 +36,7 @@ type spanContext struct {

mu sync.RWMutex // guards below fields
baggage map[string]string
hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
origin string // e.g. "synthetics"
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *spanContext) TraceID() uint64 { return c.traceID }

// ForeachBaggageItem implements ddtrace.SpanContext.
func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if atomic.LoadUint32(&c.hasBaggage) == 0 {
return
}
c.mu.RLock()
Expand Down Expand Up @@ -110,14 +110,14 @@ func (c *spanContext) setBaggageItem(key, val string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.baggage == nil {
atomic.StoreInt32(&c.hasBaggage, 1)
atomic.StoreUint32(&c.hasBaggage, 1)
c.baggage = make(map[string]string, 1)
}
c.baggage[key] = val
}

func (c *spanContext) baggageItem(key string) string {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if atomic.LoadUint32(&c.hasBaggage) == 0 {
return ""
}
c.mu.RLock()
Expand All @@ -136,7 +136,7 @@ func (c *spanContext) meta(key string) (val string, ok bool) {
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// samplingDecision is the decision to send a trace to the agent or not.
type samplingDecision int64
type samplingDecision uint32

const (
// decisionNone is the default state of a trace.
Expand Down Expand Up @@ -208,11 +208,11 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName, rat
}

func (t *trace) keep() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep))
atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep))
}

func (t *trace) drop() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop))
atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop))
}

func (t *trace) setTag(key, value string) {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *trace) push(sp *span) {
t.spans = nil // GC
log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
if haveTracer {
atomic.AddInt64(&tr.tracesDropped, 1)
atomic.AddUint32(&tr.tracesDropped, 1)
}
return
}
Expand All @@ -288,7 +288,7 @@ func (t *trace) push(sp *span) {
}
t.spans = append(t.spans, sp)
if haveTracer {
atomic.AddInt64(&tr.spansStarted, 1)
atomic.AddUint32(&tr.spansStarted, 1)
}
}

Expand Down Expand Up @@ -338,9 +338,9 @@ func (t *trace) finishedOne(s *span) {
return
}
// we have a tracer that can receive completed traces.
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans)))
tr.pushTrace(&finishedTrace{
spans: t.spans,
decision: samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))),
decision: samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
}
4 changes: 2 additions & 2 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

"github.com/stretchr/testify/assert"
)

func setupteardown(start, max int) func() {
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type concentrator struct {
buckets map[int64]*rawBucket

// stopped reports whether the concentrator is stopped (when non-zero)
stopped uint64
stopped uint32

wg sync.WaitGroup // waits for any active goroutines
bucketSize int64 // the size of a bucket in nanoseconds
Expand All @@ -79,7 +79,7 @@ func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }
// Start starts the concentrator. A started concentrator needs to be stopped
// in order to gracefully shut down, using Stop.
func (c *concentrator) Start() {
if atomic.SwapUint64(&c.stopped, 0) == 0 {
if atomic.SwapUint32(&c.stopped, 0) == 0 {
// already running
log.Warn("(*concentrator).Start called more than once. This is likely a programming error.")
return
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *concentrator) add(s *aggregableSpan) {

// Stop stops the concentrator and blocks until the operation completes.
func (c *concentrator) Stop() {
if atomic.SwapUint64(&c.stopped, 1) > 0 {
if atomic.SwapUint32(&c.stopped, 1) > 0 {
return
}
close(c.stop)
Expand Down
15 changes: 8 additions & 7 deletions ddtrace/tracer/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tracer

import (
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -60,26 +61,26 @@ func TestConcentrator(t *testing.T) {
assert.Nil(c.stop)
assert.NotNil(c.buckets)
assert.Equal(c.cfg, cfg)
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
})

t.Run("start-stop", func(t *testing.T) {
assert := assert.New(t)
c := newConcentrator(&config{}, defaultStatsBucketSize)
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Stop()
c.Stop()
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Start()
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Stop()
c.Stop()
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
})

t.Run("valid", func(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ type tracer struct {

// These integers track metrics about spans and traces as they are started,
// finished, and dropped
spansStarted, spansFinished, tracesDropped int64
spansStarted, spansFinished, tracesDropped uint32

// Records the number of dropped P0 traces and spans.
droppedP0Traces, droppedP0Spans uint64
droppedP0Traces, droppedP0Spans uint32

// partialTrace the number of partially dropped traces.
partialTraces uint64
partialTraces uint32

// rulesSampling holds an instance of the rules sampler used to apply either trace sampling,
// or single span sampling rules on spans. These are user-defined
Expand Down Expand Up @@ -344,13 +344,13 @@ func (t *tracer) sampleFinishedTrace(info *finishedTrace) {
}
if len(kept) > 0 && len(kept) < len(info.spans) {
// Some spans in the trace were kept, so a partial trace will be sent.
atomic.AddUint64(&t.partialTraces, 1)
atomic.AddUint32(&t.partialTraces, 1)
}
}
if len(kept) == 0 {
atomic.AddUint64(&t.droppedP0Traces, 1)
atomic.AddUint32(&t.droppedP0Traces, 1)
}
atomic.AddUint64(&t.droppedP0Spans, uint64(len(info.spans)-len(kept)))
atomic.AddUint32(&t.droppedP0Spans, uint32(len(info.spans)-len(kept)))
info.spans = kept
}

Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func TestSamplingDecision(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer func() {
// Must check these after tracer is stopped to avoid flakiness
assert.Equal(t, uint64(1), tracer.droppedP0Traces)
assert.Equal(t, uint64(2), tracer.droppedP0Spans)
assert.Equal(t, uint32(1), tracer.droppedP0Traces)
assert.Equal(t, uint32(2), tracer.droppedP0Spans)
}()
defer stop()
tracer.config.agent.DropP0s = true
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
if t.config.canComputeStats() {
req.Header.Set("Datadog-Client-Computed-Stats", "yes")
}
droppedTraces := int(atomic.SwapUint64(&t.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint64(&t.partialTraces, 0))
droppedSpans := int(atomic.SwapUint64(&t.droppedP0Spans, 0))
droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0))
droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0))
if stats := t.config.statsd; stats != nil {
stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces),
[]string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)
Expand Down

0 comments on commit 3e927ae

Please sign in to comment.