Skip to content

Commit

Permalink
ddtrace/opentelemetry: add RWMutex to handle concurrent calls to sett…
Browse files Browse the repository at this point in the history
…ers (#2521)
  • Loading branch information
darccio committed Feb 5, 2024
1 parent 92ad226 commit f8310a9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 4 deletions.
12 changes: 11 additions & 1 deletion ddtrace/opentelemetry/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"errors"
"strconv"
"strings"
"sync"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
Expand All @@ -25,7 +26,8 @@ import (
var _ oteltrace.Span = (*span)(nil)

type span struct {
noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations
noop.Span // https://pkg.go.dev/go.opentelemetry.io/otel/trace#hdr-API_Implementations
mu sync.RWMutex `msg:"-"` // all fields are protected by this RWMutex
DD tracer.Span
finished bool
attributes map[string]interface{}
Expand All @@ -38,10 +40,14 @@ type span struct {
func (s *span) TracerProvider() oteltrace.TracerProvider { return s.oteltracer.provider }

func (s *span) SetName(name string) {
s.mu.Lock()
defer s.mu.Unlock()
s.attributes[ext.SpanName] = strings.ToLower(name)
}

func (s *span) End(options ...oteltrace.SpanEndOption) {
s.mu.Lock()
defer s.mu.Unlock()
if s.finished {
return
}
Expand Down Expand Up @@ -157,6 +163,8 @@ type statusInfo struct {
// value before (OK > Error > Unset), the code will not be changed.
// The code and description are set once when the span is finished.
func (s *span) SetStatus(code otelcodes.Code, description string) {
s.mu.Lock()
defer s.mu.Unlock()
if code >= s.statusInfo.code {
s.statusInfo = statusInfo{code, description}
}
Expand All @@ -175,6 +183,8 @@ func (s *span) SetStatus(code otelcodes.Code, description string) {
// The list of reserved tags might be extended in the future.
// Any other non-reserved tags will be set as provided.
func (s *span) SetAttributes(kv ...attribute.KeyValue) {
s.mu.Lock()
defer s.mu.Unlock()
for _, kv := range kv {
if k, v := toReservedAttributes(string(kv.Key), kv.Value); k != "" {
s.attributes[k] = v
Expand Down
19 changes: 19 additions & 0 deletions ddtrace/opentelemetry/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,25 @@ func TestSpanTelemetry(t *testing.T) {
telemetryClient.AssertNumberOfCalls(t, "Count", 1)
}

func TestConcurrentSetAttributes(_ *testing.T) {
tp := NewTracerProvider()
otel.SetTracerProvider(tp)
tr := otel.Tracer("")

_, span := tr.Start(context.Background(), "test")
defer span.End()

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func(val int) {
defer wg.Done()
span.SetAttributes(attribute.Float64("workerID", float64(i)))
}(i)
}
}

func BenchmarkOTelApiWithNoTags(b *testing.B) {
testData := struct {
env, srv, op string
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/tracer/textmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,11 @@ func (p *propagator) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapWr
if ctx.origin != "" {
writer.Set(originHeader, ctx.origin)
}
// propagate OpenTracing baggage
for k, v := range ctx.baggage {
ctx.ForeachBaggageItem(func(k, v string) bool {
// Propagate OpenTracing baggage.
writer.Set(p.cfg.BaggagePrefix+k, v)
}
return true
})
if p.cfg.MaxTagsHeaderLen <= 0 {
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,28 @@ func TestTracerBaggageImmutability(t *testing.T) {
assert.Equal("changed!", childContext.baggage["key"])
}

func TestTracerInjectConcurrency(t *testing.T) {
tracer, _, _, stop := startTestTracer(t)
defer stop()
span, _ := StartSpanFromContext(context.Background(), "main")
defer span.Finish()

var wg sync.WaitGroup
for i := 0; i < 500; i++ {
wg.Add(1)
i := i
go func(val int) {
defer wg.Done()
span.SetBaggageItem("val", fmt.Sprintf("%d", val))

traceContext := map[string]string{}
_ = tracer.Inject(span.Context(), TextMapCarrier(traceContext))
}(i)
}

wg.Wait()
}

func TestTracerSpanTags(t *testing.T) {
tracer := newTracer()
defer tracer.Stop()
Expand Down

0 comments on commit f8310a9

Please sign in to comment.