From 3e927ae2998e305a2f23c6a2e5407b25df8aad4a Mon Sep 17 00:00:00 2001 From: Kyle Nusbaum Date: Thu, 15 Sep 2022 11:38:36 -0500 Subject: [PATCH] ddtrace/tracer: switch atomics to 32-bit (#1443) sync/atomic has several issues. Among them is that it causes a panic when a 64-bit field isn't correctly aligned. Alignment must be manually ensured and is easy to forget. Instead, we will use 32-bit atomic integers which do not require manual alignment. We can eventually trade them out for Go's new atomics APIs that were introduced in go1.19, but we have to wait until 1.18 falls out of our supported versions. Fixes #1418 --- ddtrace/tracer/metrics.go | 6 +++--- ddtrace/tracer/payload.go | 8 ++++---- ddtrace/tracer/payload_test.go | 3 ++- ddtrace/tracer/span.go | 6 +++--- ddtrace/tracer/span_test.go | 4 ++-- ddtrace/tracer/spancontext.go | 24 ++++++++++++------------ ddtrace/tracer/spancontext_test.go | 4 ++-- ddtrace/tracer/stats.go | 6 +++--- ddtrace/tracer/stats_test.go | 15 ++++++++------- ddtrace/tracer/tracer.go | 12 ++++++------ ddtrace/tracer/tracer_test.go | 4 ++-- ddtrace/tracer/transport.go | 6 +++--- 12 files changed, 50 insertions(+), 48 deletions(-) diff --git a/ddtrace/tracer/metrics.go b/ddtrace/tracer/metrics.go index 8f1738d724..858b6ae1c6 100644 --- a/ddtrace/tracer/metrics.go +++ b/ddtrace/tracer/metrics.go @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) { for { select { case <-ticker.C: - t.config.statsd.Count("datadog.tracer.spans_started", atomic.SwapInt64(&t.spansStarted, 0), nil, 1) - t.config.statsd.Count("datadog.tracer.spans_finished", atomic.SwapInt64(&t.spansFinished, 0), nil, 1) - t.config.statsd.Count("datadog.tracer.traces_dropped", atomic.SwapInt64(&t.tracesDropped, 0), []string{"reason:trace_too_large"}, 1) + t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1) + t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1) + t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1) case <-t.stop: return } diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 2107ac0635..352f62cd67 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -36,7 +36,7 @@ type payload struct { off int // count specifies the number of items in the stream. - count uint64 + count uint32 // buf holds the sequence of msgpack-encoded items. buf bytes.Buffer @@ -58,14 +58,14 @@ func (p *payload) push(t spanList) error { if err := msgp.Encode(&p.buf, t); err != nil { return err } - atomic.AddUint64(&p.count, 1) + atomic.AddUint32(&p.count, 1) p.updateHeader() return nil } // itemCount returns the number of items available in the srteam. func (p *payload) itemCount() int { - return int(atomic.LoadUint64(&p.count)) + return int(atomic.LoadUint32(&p.count)) } // size returns the payload size in bytes. After the first read the value becomes @@ -102,7 +102,7 @@ const ( // updateHeader updates the payload header based on the number of items currently // present in the stream. func (p *payload) updateHeader() { - n := atomic.LoadUint64(&p.count) + n := uint64(atomic.LoadUint32(&p.count)) switch { case n <= 15: p.header[7] = msgpackArrayFix + byte(n) diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index 73d72ea0af..cc5082eade 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -10,6 +10,7 @@ import ( "io" "strconv" "strings" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -96,7 +97,7 @@ func benchmarkPayloadThroughput(count int) func(*testing.B) { reset := func() { p.header = make([]byte, 8) p.off = 8 - p.count = 0 + atomic.StoreUint32(&p.count, 0) p.buf.Reset() } for i := 0; i < b.N; i++ { diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index 1567ec0015..b46a238276 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -221,13 +221,13 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) { if yes { if s.Error == 0 { // new error - atomic.AddInt64(&s.context.errors, 1) + atomic.AddInt32(&s.context.errors, 1) } s.Error = 1 } else { if s.Error > 0 { // flip from active to inactive - atomic.AddInt64(&s.context.errors, -1) + atomic.AddInt32(&s.context.errors, -1) } s.Error = 0 } @@ -518,7 +518,7 @@ func shouldKeep(s *span) bool { // positive sampling priorities stay return true } - if atomic.LoadInt64(&s.context.errors) > 0 { + if atomic.LoadInt32(&s.context.errors) > 0 { // traces with any span containing an error get kept return true } diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index acf1ab05ff..1f64ca0365 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -106,7 +106,7 @@ func TestSpanFinishTwice(t *testing.T) { func TestShouldDrop(t *testing.T) { for _, tt := range []struct { prio int - errors int64 + errors int32 rate float64 want bool }{ @@ -122,7 +122,7 @@ func TestShouldDrop(t *testing.T) { s := newSpan("", "", "", 1, 1, 0) s.SetTag(ext.SamplingPriority, tt.prio) s.SetTag(ext.EventSampleRate, tt.rate) - atomic.StoreInt64(&s.context.errors, tt.errors) + atomic.StoreInt32(&s.context.errors, tt.errors) assert.Equal(t, shouldKeep(s), tt.want) }) } diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index c7ee376c1f..b0e724e277 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -27,7 +27,7 @@ type spanContext struct { trace *trace // reference to the trace that this span belongs too span *span // reference to the span that hosts this context - errors int64 // number of spans with errors in this trace + errors int32 // number of spans with errors in this trace // the below group should propagate cross-process @@ -36,7 +36,7 @@ type spanContext struct { mu sync.RWMutex // guards below fields baggage map[string]string - hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists. + hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists. origin string // e.g. "synthetics" } @@ -80,7 +80,7 @@ func (c *spanContext) TraceID() uint64 { return c.traceID } // ForeachBaggageItem implements ddtrace.SpanContext. func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { - if atomic.LoadInt32(&c.hasBaggage) == 0 { + if atomic.LoadUint32(&c.hasBaggage) == 0 { return } c.mu.RLock() @@ -110,14 +110,14 @@ func (c *spanContext) setBaggageItem(key, val string) { c.mu.Lock() defer c.mu.Unlock() if c.baggage == nil { - atomic.StoreInt32(&c.hasBaggage, 1) + atomic.StoreUint32(&c.hasBaggage, 1) c.baggage = make(map[string]string, 1) } c.baggage[key] = val } func (c *spanContext) baggageItem(key string) string { - if atomic.LoadInt32(&c.hasBaggage) == 0 { + if atomic.LoadUint32(&c.hasBaggage) == 0 { return "" } c.mu.RLock() @@ -136,7 +136,7 @@ func (c *spanContext) meta(key string) (val string, ok bool) { func (c *spanContext) finish() { c.trace.finishedOne(c.span) } // samplingDecision is the decision to send a trace to the agent or not. -type samplingDecision int64 +type samplingDecision uint32 const ( // decisionNone is the default state of a trace. @@ -208,11 +208,11 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName, rat } func (t *trace) keep() { - atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep)) + atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep)) } func (t *trace) drop() { - atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop)) + atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop)) } func (t *trace) setTag(key, value string) { @@ -279,7 +279,7 @@ func (t *trace) push(sp *span) { t.spans = nil // GC log.Error("trace buffer full (%d), dropping trace", traceMaxSize) if haveTracer { - atomic.AddInt64(&tr.tracesDropped, 1) + atomic.AddUint32(&tr.tracesDropped, 1) } return } @@ -288,7 +288,7 @@ func (t *trace) push(sp *span) { } t.spans = append(t.spans, sp) if haveTracer { - atomic.AddInt64(&tr.spansStarted, 1) + atomic.AddUint32(&tr.spansStarted, 1) } } @@ -338,9 +338,9 @@ func (t *trace) finishedOne(s *span) { return } // we have a tracer that can receive completed traces. - atomic.AddInt64(&tr.spansFinished, int64(len(t.spans))) + atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans))) tr.pushTrace(&finishedTrace{ spans: t.spans, - decision: samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))), + decision: samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))), }) } diff --git a/ddtrace/tracer/spancontext_test.go b/ddtrace/tracer/spancontext_test.go index 22c7821c77..9987631086 100644 --- a/ddtrace/tracer/spancontext_test.go +++ b/ddtrace/tracer/spancontext_test.go @@ -12,11 +12,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" + + "github.com/stretchr/testify/assert" ) func setupteardown(start, max int) func() { diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 99029de556..3c9ee344f0 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -52,7 +52,7 @@ type concentrator struct { buckets map[int64]*rawBucket // stopped reports whether the concentrator is stopped (when non-zero) - stopped uint64 + stopped uint32 wg sync.WaitGroup // waits for any active goroutines bucketSize int64 // the size of a bucket in nanoseconds @@ -79,7 +79,7 @@ func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize } // Start starts the concentrator. A started concentrator needs to be stopped // in order to gracefully shut down, using Stop. func (c *concentrator) Start() { - if atomic.SwapUint64(&c.stopped, 0) == 0 { + if atomic.SwapUint32(&c.stopped, 0) == 0 { // already running log.Warn("(*concentrator).Start called more than once. This is likely a programming error.") return @@ -149,7 +149,7 @@ func (c *concentrator) add(s *aggregableSpan) { // Stop stops the concentrator and blocks until the operation completes. func (c *concentrator) Stop() { - if atomic.SwapUint64(&c.stopped, 1) > 0 { + if atomic.SwapUint32(&c.stopped, 1) > 0 { return } close(c.stop) diff --git a/ddtrace/tracer/stats_test.go b/ddtrace/tracer/stats_test.go index 6c58e657e4..56e3eb029d 100644 --- a/ddtrace/tracer/stats_test.go +++ b/ddtrace/tracer/stats_test.go @@ -6,6 +6,7 @@ package tracer import ( + "sync/atomic" "testing" "time" @@ -60,26 +61,26 @@ func TestConcentrator(t *testing.T) { assert.Nil(c.stop) assert.NotNil(c.buckets) assert.Equal(c.cfg, cfg) - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) }) t.Run("start-stop", func(t *testing.T) { assert := assert.New(t) c := newConcentrator(&config{}, defaultStatsBucketSize) - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Stop() c.Stop() - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Start() c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Stop() c.Stop() - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) }) t.Run("valid", func(t *testing.T) { diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index cb29718e5b..0f9e3596bf 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -70,13 +70,13 @@ type tracer struct { // These integers track metrics about spans and traces as they are started, // finished, and dropped - spansStarted, spansFinished, tracesDropped int64 + spansStarted, spansFinished, tracesDropped uint32 // Records the number of dropped P0 traces and spans. - droppedP0Traces, droppedP0Spans uint64 + droppedP0Traces, droppedP0Spans uint32 // partialTrace the number of partially dropped traces. - partialTraces uint64 + partialTraces uint32 // rulesSampling holds an instance of the rules sampler used to apply either trace sampling, // or single span sampling rules on spans. These are user-defined @@ -344,13 +344,13 @@ func (t *tracer) sampleFinishedTrace(info *finishedTrace) { } if len(kept) > 0 && len(kept) < len(info.spans) { // Some spans in the trace were kept, so a partial trace will be sent. - atomic.AddUint64(&t.partialTraces, 1) + atomic.AddUint32(&t.partialTraces, 1) } } if len(kept) == 0 { - atomic.AddUint64(&t.droppedP0Traces, 1) + atomic.AddUint32(&t.droppedP0Traces, 1) } - atomic.AddUint64(&t.droppedP0Spans, uint64(len(info.spans)-len(kept))) + atomic.AddUint32(&t.droppedP0Spans, uint32(len(info.spans)-len(kept))) info.spans = kept } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 01e4cb3207..d05327d95e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -353,8 +353,8 @@ func TestSamplingDecision(t *testing.T) { tracer, _, _, stop := startTestTracer(t) defer func() { // Must check these after tracer is stopped to avoid flakiness - assert.Equal(t, uint64(1), tracer.droppedP0Traces) - assert.Equal(t, uint64(2), tracer.droppedP0Spans) + assert.Equal(t, uint32(1), tracer.droppedP0Traces) + assert.Equal(t, uint32(2), tracer.droppedP0Spans) }() defer stop() tracer.config.agent.DropP0s = true diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 85da05dd6f..339e61201b 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -151,9 +151,9 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) { if t.config.canComputeStats() { req.Header.Set("Datadog-Client-Computed-Stats", "yes") } - droppedTraces := int(atomic.SwapUint64(&t.droppedP0Traces, 0)) - partialTraces := int(atomic.SwapUint64(&t.partialTraces, 0)) - droppedSpans := int(atomic.SwapUint64(&t.droppedP0Spans, 0)) + droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0)) + partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0)) + droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0)) if stats := t.config.statsd; stats != nil { stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces), []string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)