Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddtrace/tracer: switch atomics to 32-bit #1443

Merged
merged 1 commit into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions ddtrace/tracer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) {
for {
select {
case <-ticker.C:
t.config.statsd.Count("datadog.tracer.spans_started", atomic.SwapInt64(&t.spansStarted, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", atomic.SwapInt64(&t.spansFinished, 0), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", atomic.SwapInt64(&t.tracesDropped, 0), []string{"reason:trace_too_large"}, 1)
t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1)
t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1)
case <-t.stop:
return
}
Expand Down
8 changes: 4 additions & 4 deletions ddtrace/tracer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type payload struct {
off int

// count specifies the number of items in the stream.
count uint64
count uint32

// buf holds the sequence of msgpack-encoded items.
buf bytes.Buffer
Expand All @@ -58,14 +58,14 @@ func (p *payload) push(t spanList) error {
if err := msgp.Encode(&p.buf, t); err != nil {
return err
}
atomic.AddUint64(&p.count, 1)
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
}

// itemCount returns the number of items available in the srteam.
func (p *payload) itemCount() int {
return int(atomic.LoadUint64(&p.count))
return int(atomic.LoadUint32(&p.count))
}

// size returns the payload size in bytes. After the first read the value becomes
Expand Down Expand Up @@ -102,7 +102,7 @@ const (
// updateHeader updates the payload header based on the number of items currently
// present in the stream.
func (p *payload) updateHeader() {
n := atomic.LoadUint64(&p.count)
n := uint64(atomic.LoadUint32(&p.count))
knusbaum marked this conversation as resolved.
Show resolved Hide resolved
switch {
case n <= 15:
p.header[7] = msgpackArrayFix + byte(n)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/tracer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"strconv"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,7 +97,7 @@ func benchmarkPayloadThroughput(count int) func(*testing.B) {
reset := func() {
p.header = make([]byte, 8)
p.off = 8
p.count = 0
atomic.StoreUint32(&p.count, 0)
p.buf.Reset()
}
for i := 0; i < b.N; i++ {
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) {
if yes {
if s.Error == 0 {
// new error
atomic.AddInt64(&s.context.errors, 1)
atomic.AddInt32(&s.context.errors, 1)
}
s.Error = 1
} else {
if s.Error > 0 {
// flip from active to inactive
atomic.AddInt64(&s.context.errors, -1)
atomic.AddInt32(&s.context.errors, -1)
}
s.Error = 0
}
Expand Down Expand Up @@ -518,7 +518,7 @@ func shouldKeep(s *span) bool {
// positive sampling priorities stay
return true
}
if atomic.LoadInt64(&s.context.errors) > 0 {
if atomic.LoadInt32(&s.context.errors) > 0 {
// traces with any span containing an error get kept
return true
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestSpanFinishTwice(t *testing.T) {
func TestShouldDrop(t *testing.T) {
for _, tt := range []struct {
prio int
errors int64
errors int32
rate float64
want bool
}{
Expand All @@ -122,7 +122,7 @@ func TestShouldDrop(t *testing.T) {
s := newSpan("", "", "", 1, 1, 0)
s.SetTag(ext.SamplingPriority, tt.prio)
s.SetTag(ext.EventSampleRate, tt.rate)
atomic.StoreInt64(&s.context.errors, tt.errors)
atomic.StoreInt32(&s.context.errors, tt.errors)
assert.Equal(t, shouldKeep(s), tt.want)
})
}
Expand Down
24 changes: 12 additions & 12 deletions ddtrace/tracer/spancontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type spanContext struct {

trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
errors int64 // number of spans with errors in this trace
errors int32 // number of spans with errors in this trace

// the below group should propagate cross-process

Expand All @@ -36,7 +36,7 @@ type spanContext struct {

mu sync.RWMutex // guards below fields
baggage map[string]string
hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists.
origin string // e.g. "synthetics"
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *spanContext) TraceID() uint64 { return c.traceID }

// ForeachBaggageItem implements ddtrace.SpanContext.
func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if atomic.LoadUint32(&c.hasBaggage) == 0 {
return
}
c.mu.RLock()
Expand Down Expand Up @@ -110,14 +110,14 @@ func (c *spanContext) setBaggageItem(key, val string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.baggage == nil {
atomic.StoreInt32(&c.hasBaggage, 1)
atomic.StoreUint32(&c.hasBaggage, 1)
c.baggage = make(map[string]string, 1)
}
c.baggage[key] = val
}

func (c *spanContext) baggageItem(key string) string {
if atomic.LoadInt32(&c.hasBaggage) == 0 {
if atomic.LoadUint32(&c.hasBaggage) == 0 {
return ""
}
c.mu.RLock()
Expand All @@ -136,7 +136,7 @@ func (c *spanContext) meta(key string) (val string, ok bool) {
func (c *spanContext) finish() { c.trace.finishedOne(c.span) }

// samplingDecision is the decision to send a trace to the agent or not.
type samplingDecision int64
type samplingDecision uint32

const (
// decisionNone is the default state of a trace.
Expand Down Expand Up @@ -208,11 +208,11 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName, rat
}

func (t *trace) keep() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep))
atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep))
}

func (t *trace) drop() {
atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop))
atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop))
}

func (t *trace) setTag(key, value string) {
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *trace) push(sp *span) {
t.spans = nil // GC
log.Error("trace buffer full (%d), dropping trace", traceMaxSize)
if haveTracer {
atomic.AddInt64(&tr.tracesDropped, 1)
atomic.AddUint32(&tr.tracesDropped, 1)
}
return
}
Expand All @@ -288,7 +288,7 @@ func (t *trace) push(sp *span) {
}
t.spans = append(t.spans, sp)
if haveTracer {
atomic.AddInt64(&tr.spansStarted, 1)
atomic.AddUint32(&tr.spansStarted, 1)
}
}

Expand Down Expand Up @@ -338,9 +338,9 @@ func (t *trace) finishedOne(s *span) {
return
}
// we have a tracer that can receive completed traces.
atomic.AddInt64(&tr.spansFinished, int64(len(t.spans)))
atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans)))
tr.pushTrace(&finishedTrace{
spans: t.spans,
decision: samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))),
decision: samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))),
})
}
4 changes: 2 additions & 2 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"

"github.com/stretchr/testify/assert"
)

func setupteardown(start, max int) func() {
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type concentrator struct {
buckets map[int64]*rawBucket

// stopped reports whether the concentrator is stopped (when non-zero)
stopped uint64
stopped uint32

wg sync.WaitGroup // waits for any active goroutines
bucketSize int64 // the size of a bucket in nanoseconds
Expand All @@ -79,7 +79,7 @@ func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize }
// Start starts the concentrator. A started concentrator needs to be stopped
// in order to gracefully shut down, using Stop.
func (c *concentrator) Start() {
if atomic.SwapUint64(&c.stopped, 0) == 0 {
if atomic.SwapUint32(&c.stopped, 0) == 0 {
// already running
log.Warn("(*concentrator).Start called more than once. This is likely a programming error.")
return
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *concentrator) add(s *aggregableSpan) {

// Stop stops the concentrator and blocks until the operation completes.
func (c *concentrator) Stop() {
if atomic.SwapUint64(&c.stopped, 1) > 0 {
if atomic.SwapUint32(&c.stopped, 1) > 0 {
return
}
close(c.stop)
Expand Down
15 changes: 8 additions & 7 deletions ddtrace/tracer/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package tracer

import (
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -60,26 +61,26 @@ func TestConcentrator(t *testing.T) {
assert.Nil(c.stop)
assert.NotNil(c.buckets)
assert.Equal(c.cfg, cfg)
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
})

t.Run("start-stop", func(t *testing.T) {
assert := assert.New(t)
c := newConcentrator(&config{}, defaultStatsBucketSize)
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Stop()
c.Stop()
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Start()
c.Start()
assert.EqualValues(c.stopped, 0)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 0)
c.Stop()
c.Stop()
assert.EqualValues(c.stopped, 1)
assert.EqualValues(atomic.LoadUint32(&c.stopped), 1)
})

t.Run("valid", func(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ type tracer struct {

// These integers track metrics about spans and traces as they are started,
// finished, and dropped
spansStarted, spansFinished, tracesDropped int64
spansStarted, spansFinished, tracesDropped uint32

// Records the number of dropped P0 traces and spans.
droppedP0Traces, droppedP0Spans uint64
droppedP0Traces, droppedP0Spans uint32

// partialTrace the number of partially dropped traces.
partialTraces uint64
partialTraces uint32

// rulesSampling holds an instance of the rules sampler used to apply either trace sampling,
// or single span sampling rules on spans. These are user-defined
Expand Down Expand Up @@ -344,13 +344,13 @@ func (t *tracer) sampleFinishedTrace(info *finishedTrace) {
}
if len(kept) > 0 && len(kept) < len(info.spans) {
// Some spans in the trace were kept, so a partial trace will be sent.
atomic.AddUint64(&t.partialTraces, 1)
atomic.AddUint32(&t.partialTraces, 1)
}
}
if len(kept) == 0 {
atomic.AddUint64(&t.droppedP0Traces, 1)
atomic.AddUint32(&t.droppedP0Traces, 1)
}
atomic.AddUint64(&t.droppedP0Spans, uint64(len(info.spans)-len(kept)))
atomic.AddUint32(&t.droppedP0Spans, uint32(len(info.spans)-len(kept)))
info.spans = kept
}

Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func TestSamplingDecision(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer func() {
// Must check these after tracer is stopped to avoid flakiness
assert.Equal(t, uint64(1), tracer.droppedP0Traces)
assert.Equal(t, uint64(2), tracer.droppedP0Spans)
assert.Equal(t, uint32(1), tracer.droppedP0Traces)
assert.Equal(t, uint32(2), tracer.droppedP0Spans)
}()
defer stop()
tracer.config.agent.DropP0s = true
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
if t.config.canComputeStats() {
req.Header.Set("Datadog-Client-Computed-Stats", "yes")
}
droppedTraces := int(atomic.SwapUint64(&t.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint64(&t.partialTraces, 0))
droppedSpans := int(atomic.SwapUint64(&t.droppedP0Spans, 0))
droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0))
partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0))
droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0))
if stats := t.config.statsd; stats != nil {
stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces),
[]string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)
Expand Down