diff --git a/ddtrace/tracer/metrics.go b/ddtrace/tracer/metrics.go index 8f1738d724..858b6ae1c6 100644 --- a/ddtrace/tracer/metrics.go +++ b/ddtrace/tracer/metrics.go @@ -99,9 +99,9 @@ func (t *tracer) reportHealthMetrics(interval time.Duration) { for { select { case <-ticker.C: - t.config.statsd.Count("datadog.tracer.spans_started", atomic.SwapInt64(&t.spansStarted, 0), nil, 1) - t.config.statsd.Count("datadog.tracer.spans_finished", atomic.SwapInt64(&t.spansFinished, 0), nil, 1) - t.config.statsd.Count("datadog.tracer.traces_dropped", atomic.SwapInt64(&t.tracesDropped, 0), []string{"reason:trace_too_large"}, 1) + t.config.statsd.Count("datadog.tracer.spans_started", int64(atomic.SwapUint32(&t.spansStarted, 0)), nil, 1) + t.config.statsd.Count("datadog.tracer.spans_finished", int64(atomic.SwapUint32(&t.spansFinished, 0)), nil, 1) + t.config.statsd.Count("datadog.tracer.traces_dropped", int64(atomic.SwapUint32(&t.tracesDropped, 0)), []string{"reason:trace_too_large"}, 1) case <-t.stop: return } diff --git a/ddtrace/tracer/payload.go b/ddtrace/tracer/payload.go index 2107ac0635..352f62cd67 100644 --- a/ddtrace/tracer/payload.go +++ b/ddtrace/tracer/payload.go @@ -36,7 +36,7 @@ type payload struct { off int // count specifies the number of items in the stream. - count uint64 + count uint32 // buf holds the sequence of msgpack-encoded items. buf bytes.Buffer @@ -58,14 +58,14 @@ func (p *payload) push(t spanList) error { if err := msgp.Encode(&p.buf, t); err != nil { return err } - atomic.AddUint64(&p.count, 1) + atomic.AddUint32(&p.count, 1) p.updateHeader() return nil } // itemCount returns the number of items available in the srteam. func (p *payload) itemCount() int { - return int(atomic.LoadUint64(&p.count)) + return int(atomic.LoadUint32(&p.count)) } // size returns the payload size in bytes. After the first read the value becomes @@ -102,7 +102,7 @@ const ( // updateHeader updates the payload header based on the number of items currently // present in the stream. func (p *payload) updateHeader() { - n := atomic.LoadUint64(&p.count) + n := uint64(atomic.LoadUint32(&p.count)) switch { case n <= 15: p.header[7] = msgpackArrayFix + byte(n) diff --git a/ddtrace/tracer/payload_test.go b/ddtrace/tracer/payload_test.go index 73d72ea0af..cc5082eade 100644 --- a/ddtrace/tracer/payload_test.go +++ b/ddtrace/tracer/payload_test.go @@ -10,6 +10,7 @@ import ( "io" "strconv" "strings" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -96,7 +97,7 @@ func benchmarkPayloadThroughput(count int) func(*testing.B) { reset := func() { p.header = make([]byte, 8) p.off = 8 - p.count = 0 + atomic.StoreUint32(&p.count, 0) p.buf.Reset() } for i := 0; i < b.N; i++ { diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index 1567ec0015..b46a238276 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -221,13 +221,13 @@ func (s *span) setTagError(value interface{}, cfg errorConfig) { if yes { if s.Error == 0 { // new error - atomic.AddInt64(&s.context.errors, 1) + atomic.AddInt32(&s.context.errors, 1) } s.Error = 1 } else { if s.Error > 0 { // flip from active to inactive - atomic.AddInt64(&s.context.errors, -1) + atomic.AddInt32(&s.context.errors, -1) } s.Error = 0 } @@ -518,7 +518,7 @@ func shouldKeep(s *span) bool { // positive sampling priorities stay return true } - if atomic.LoadInt64(&s.context.errors) > 0 { + if atomic.LoadInt32(&s.context.errors) > 0 { // traces with any span containing an error get kept return true } diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index acf1ab05ff..1f64ca0365 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -106,7 +106,7 @@ func TestSpanFinishTwice(t *testing.T) { func TestShouldDrop(t *testing.T) { for _, tt := range []struct { prio int - errors int64 + errors int32 rate float64 want bool }{ @@ -122,7 +122,7 @@ func TestShouldDrop(t *testing.T) { s := newSpan("", "", "", 1, 1, 0) s.SetTag(ext.SamplingPriority, tt.prio) s.SetTag(ext.EventSampleRate, tt.rate) - atomic.StoreInt64(&s.context.errors, tt.errors) + atomic.StoreInt32(&s.context.errors, tt.errors) assert.Equal(t, shouldKeep(s), tt.want) }) } diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index c7ee376c1f..b0e724e277 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -27,7 +27,7 @@ type spanContext struct { trace *trace // reference to the trace that this span belongs too span *span // reference to the span that hosts this context - errors int64 // number of spans with errors in this trace + errors int32 // number of spans with errors in this trace // the below group should propagate cross-process @@ -36,7 +36,7 @@ type spanContext struct { mu sync.RWMutex // guards below fields baggage map[string]string - hasBaggage int32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists. + hasBaggage uint32 // atomic int for quick checking presence of baggage. 0 indicates no baggage, otherwise baggage exists. origin string // e.g. "synthetics" } @@ -80,7 +80,7 @@ func (c *spanContext) TraceID() uint64 { return c.traceID } // ForeachBaggageItem implements ddtrace.SpanContext. func (c *spanContext) ForeachBaggageItem(handler func(k, v string) bool) { - if atomic.LoadInt32(&c.hasBaggage) == 0 { + if atomic.LoadUint32(&c.hasBaggage) == 0 { return } c.mu.RLock() @@ -110,14 +110,14 @@ func (c *spanContext) setBaggageItem(key, val string) { c.mu.Lock() defer c.mu.Unlock() if c.baggage == nil { - atomic.StoreInt32(&c.hasBaggage, 1) + atomic.StoreUint32(&c.hasBaggage, 1) c.baggage = make(map[string]string, 1) } c.baggage[key] = val } func (c *spanContext) baggageItem(key string) string { - if atomic.LoadInt32(&c.hasBaggage) == 0 { + if atomic.LoadUint32(&c.hasBaggage) == 0 { return "" } c.mu.RLock() @@ -136,7 +136,7 @@ func (c *spanContext) meta(key string) (val string, ok bool) { func (c *spanContext) finish() { c.trace.finishedOne(c.span) } // samplingDecision is the decision to send a trace to the agent or not. -type samplingDecision int64 +type samplingDecision uint32 const ( // decisionNone is the default state of a trace. @@ -208,11 +208,11 @@ func (t *trace) setSamplingPriority(p int, sampler samplernames.SamplerName, rat } func (t *trace) keep() { - atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep)) + atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionKeep)) } func (t *trace) drop() { - atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop)) + atomic.CompareAndSwapUint32((*uint32)(&t.samplingDecision), uint32(decisionNone), uint32(decisionDrop)) } func (t *trace) setTag(key, value string) { @@ -279,7 +279,7 @@ func (t *trace) push(sp *span) { t.spans = nil // GC log.Error("trace buffer full (%d), dropping trace", traceMaxSize) if haveTracer { - atomic.AddInt64(&tr.tracesDropped, 1) + atomic.AddUint32(&tr.tracesDropped, 1) } return } @@ -288,7 +288,7 @@ func (t *trace) push(sp *span) { } t.spans = append(t.spans, sp) if haveTracer { - atomic.AddInt64(&tr.spansStarted, 1) + atomic.AddUint32(&tr.spansStarted, 1) } } @@ -338,9 +338,9 @@ func (t *trace) finishedOne(s *span) { return } // we have a tracer that can receive completed traces. - atomic.AddInt64(&tr.spansFinished, int64(len(t.spans))) + atomic.AddUint32(&tr.spansFinished, uint32(len(t.spans))) tr.pushTrace(&finishedTrace{ spans: t.spans, - decision: samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))), + decision: samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))), }) } diff --git a/ddtrace/tracer/spancontext_test.go b/ddtrace/tracer/spancontext_test.go index 22c7821c77..9987631086 100644 --- a/ddtrace/tracer/spancontext_test.go +++ b/ddtrace/tracer/spancontext_test.go @@ -12,11 +12,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames" + + "github.com/stretchr/testify/assert" ) func setupteardown(start, max int) func() { diff --git a/ddtrace/tracer/stats.go b/ddtrace/tracer/stats.go index 99029de556..3c9ee344f0 100644 --- a/ddtrace/tracer/stats.go +++ b/ddtrace/tracer/stats.go @@ -52,7 +52,7 @@ type concentrator struct { buckets map[int64]*rawBucket // stopped reports whether the concentrator is stopped (when non-zero) - stopped uint64 + stopped uint32 wg sync.WaitGroup // waits for any active goroutines bucketSize int64 // the size of a bucket in nanoseconds @@ -79,7 +79,7 @@ func alignTs(ts, bucketSize int64) int64 { return ts - ts%bucketSize } // Start starts the concentrator. A started concentrator needs to be stopped // in order to gracefully shut down, using Stop. func (c *concentrator) Start() { - if atomic.SwapUint64(&c.stopped, 0) == 0 { + if atomic.SwapUint32(&c.stopped, 0) == 0 { // already running log.Warn("(*concentrator).Start called more than once. This is likely a programming error.") return @@ -149,7 +149,7 @@ func (c *concentrator) add(s *aggregableSpan) { // Stop stops the concentrator and blocks until the operation completes. func (c *concentrator) Stop() { - if atomic.SwapUint64(&c.stopped, 1) > 0 { + if atomic.SwapUint32(&c.stopped, 1) > 0 { return } close(c.stop) diff --git a/ddtrace/tracer/stats_test.go b/ddtrace/tracer/stats_test.go index 6c58e657e4..56e3eb029d 100644 --- a/ddtrace/tracer/stats_test.go +++ b/ddtrace/tracer/stats_test.go @@ -6,6 +6,7 @@ package tracer import ( + "sync/atomic" "testing" "time" @@ -60,26 +61,26 @@ func TestConcentrator(t *testing.T) { assert.Nil(c.stop) assert.NotNil(c.buckets) assert.Equal(c.cfg, cfg) - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) }) t.Run("start-stop", func(t *testing.T) { assert := assert.New(t) c := newConcentrator(&config{}, defaultStatsBucketSize) - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Stop() c.Stop() - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Start() c.Start() - assert.EqualValues(c.stopped, 0) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 0) c.Stop() c.Stop() - assert.EqualValues(c.stopped, 1) + assert.EqualValues(atomic.LoadUint32(&c.stopped), 1) }) t.Run("valid", func(t *testing.T) { diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index cb29718e5b..0f9e3596bf 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -70,13 +70,13 @@ type tracer struct { // These integers track metrics about spans and traces as they are started, // finished, and dropped - spansStarted, spansFinished, tracesDropped int64 + spansStarted, spansFinished, tracesDropped uint32 // Records the number of dropped P0 traces and spans. - droppedP0Traces, droppedP0Spans uint64 + droppedP0Traces, droppedP0Spans uint32 // partialTrace the number of partially dropped traces. - partialTraces uint64 + partialTraces uint32 // rulesSampling holds an instance of the rules sampler used to apply either trace sampling, // or single span sampling rules on spans. These are user-defined @@ -344,13 +344,13 @@ func (t *tracer) sampleFinishedTrace(info *finishedTrace) { } if len(kept) > 0 && len(kept) < len(info.spans) { // Some spans in the trace were kept, so a partial trace will be sent. - atomic.AddUint64(&t.partialTraces, 1) + atomic.AddUint32(&t.partialTraces, 1) } } if len(kept) == 0 { - atomic.AddUint64(&t.droppedP0Traces, 1) + atomic.AddUint32(&t.droppedP0Traces, 1) } - atomic.AddUint64(&t.droppedP0Spans, uint64(len(info.spans)-len(kept))) + atomic.AddUint32(&t.droppedP0Spans, uint32(len(info.spans)-len(kept))) info.spans = kept } diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 01e4cb3207..d05327d95e 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -353,8 +353,8 @@ func TestSamplingDecision(t *testing.T) { tracer, _, _, stop := startTestTracer(t) defer func() { // Must check these after tracer is stopped to avoid flakiness - assert.Equal(t, uint64(1), tracer.droppedP0Traces) - assert.Equal(t, uint64(2), tracer.droppedP0Spans) + assert.Equal(t, uint32(1), tracer.droppedP0Traces) + assert.Equal(t, uint32(2), tracer.droppedP0Spans) }() defer stop() tracer.config.agent.DropP0s = true diff --git a/ddtrace/tracer/transport.go b/ddtrace/tracer/transport.go index 85da05dd6f..339e61201b 100644 --- a/ddtrace/tracer/transport.go +++ b/ddtrace/tracer/transport.go @@ -151,9 +151,9 @@ func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) { if t.config.canComputeStats() { req.Header.Set("Datadog-Client-Computed-Stats", "yes") } - droppedTraces := int(atomic.SwapUint64(&t.droppedP0Traces, 0)) - partialTraces := int(atomic.SwapUint64(&t.partialTraces, 0)) - droppedSpans := int(atomic.SwapUint64(&t.droppedP0Spans, 0)) + droppedTraces := int(atomic.SwapUint32(&t.droppedP0Traces, 0)) + partialTraces := int(atomic.SwapUint32(&t.partialTraces, 0)) + droppedSpans := int(atomic.SwapUint32(&t.droppedP0Spans, 0)) if stats := t.config.statsd; stats != nil { stats.Count("datadog.tracer.dropped_p0_traces", int64(droppedTraces), []string{fmt.Sprintf("partial:%s", strconv.FormatBool(partialTraces > 0))}, 1)