From f0eee6ad1f26af9c59f4442c24ecc76567d7bde8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dario=20Casta=C3=B1=C3=A9?= Date: Wed, 8 May 2024 16:15:41 +0200 Subject: [PATCH] [v2] ddtrace/tracer: tidy up Tracer interface (#2633) --- ddtrace/mocktracer/mocktracer.go | 7 +-- ddtrace/tracer/globaltracer_test.go | 7 +-- ddtrace/tracer/noop.go | 9 +-- ddtrace/tracer/span.go | 18 ++---- ddtrace/tracer/spancontext.go | 15 ++--- ddtrace/tracer/tracer.go | 89 +++++++++++++++++------------ ddtrace/tracer/tracer_test.go | 10 ++-- 7 files changed, 78 insertions(+), 77 deletions(-) diff --git a/ddtrace/mocktracer/mocktracer.go b/ddtrace/mocktracer/mocktracer.go index 6f5cfc1e96..76ab8d0812 100644 --- a/ddtrace/mocktracer/mocktracer.go +++ b/ddtrace/mocktracer/mocktracer.go @@ -190,7 +190,6 @@ func (t *mocktracer) TracerConf() tracer.TracerConf { return tracer.TracerConf{} } -func (t *mocktracer) SubmitStats(*tracer.Span) {} -func (t *mocktracer) SubmitAbandonedSpan(*tracer.Span, bool) {} -func (t *mocktracer) SubmitChunk(_ any) {} -func (t *mocktracer) Flush() {} +func (t *mocktracer) Submit(*tracer.Span) {} +func (t *mocktracer) SubmitChunk(*tracer.Chunk) {} +func (t *mocktracer) Flush() {} diff --git a/ddtrace/tracer/globaltracer_test.go b/ddtrace/tracer/globaltracer_test.go index c5977bbf92..2b6ee8e580 100644 --- a/ddtrace/tracer/globaltracer_test.go +++ b/ddtrace/tracer/globaltracer_test.go @@ -30,10 +30,9 @@ func (*raceTestTracer) TracerConf() TracerConf { return TracerConf{} } -func (*raceTestTracer) SubmitStats(*Span) {} -func (*raceTestTracer) SubmitAbandonedSpan(*Span, bool) {} -func (*raceTestTracer) SubmitChunk(any) {} -func (*raceTestTracer) Flush() {} +func (*raceTestTracer) Submit(*Span) {} +func (*raceTestTracer) SubmitChunk(*Chunk) {} +func (*raceTestTracer) Flush() {} func TestGlobalTracer(t *testing.T) { // at module initialization, the tracer must be seet diff --git a/ddtrace/tracer/noop.go b/ddtrace/tracer/noop.go index 64dc1e8cb4..f7d7586b2f 100644 --- a/ddtrace/tracer/noop.go +++ b/ddtrace/tracer/noop.go @@ -29,13 +29,10 @@ func (NoopTracer) Inject(_ *SpanContext, _ interface{}) error { return nil } // Stop implements Tracer. func (NoopTracer) Stop() {} -// TODO(kjn v2): These should be removed. They are here temporarily to facilitate -// the shift to the v2 API. func (NoopTracer) TracerConf() TracerConf { return TracerConf{} } -func (NoopTracer) SubmitStats(*Span) {} -func (NoopTracer) SubmitAbandonedSpan(*Span, bool) {} -func (NoopTracer) SubmitChunk(any) {} -func (NoopTracer) Flush() {} +func (NoopTracer) Submit(*Span) {} +func (NoopTracer) SubmitChunk(*Chunk) {} +func (NoopTracer) Flush() {} diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index 56ed829915..63f66c566b 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -625,20 +625,10 @@ func (s *Span) finish(finishTime int64) { keep := true if t := GetGlobalTracer(); t != nil { tc := t.TracerConf() - if !tc.Disabled { - // we have an active tracer - if tc.CanComputeStats && shouldComputeStats(s) { - // the agent supports computed stats - t.SubmitStats(s) - } - if tc.CanDropP0s { - // the agent supports dropping p0's in the client - keep = shouldKeep(s) - } - if tc.DebugAbandonedSpans { - // the tracer supports debugging abandoned spans - t.SubmitAbandonedSpan(s, true) - } + t.Submit(s) + if tc.CanDropP0s { + // the agent supports dropping p0's in the client + keep = shouldKeep(s) } } if keep { diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index 4349c0b459..31d74383d2 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -498,7 +498,7 @@ func (t *trace) finishedOne(s *Span) { return } tc := tr.TracerConf() - setPeerService(s, tr) + setPeerService(s, tc.PeerServiceDefaults, tc.PeerServiceMappings) // attach the _dd.base_service tag only when the globally configured service name is different from the // span service name. @@ -529,7 +529,7 @@ func (t *trace) finishedOne(s *Span) { if len(t.spans) == t.finished { // perform a full flush of all spans t.finishChunk(tr, &Chunk{ - Spans: t.spans, + spans: t.spans, willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))), }) t.spans = nil @@ -560,29 +560,26 @@ func (t *trace) finishedOne(s *Span) { t.setTraceTags(finishedSpans[0]) } t.finishChunk(tr, &Chunk{ - Spans: finishedSpans, + spans: finishedSpans, willSend: decisionKeep == samplingDecision(atomic.LoadUint32((*uint32)(&t.samplingDecision))), }) t.spans = leftoverSpans } func (t *trace) finishChunk(tr Tracer, ch *Chunk) { - //atomic.AddUint32(&tr.spansFinished, uint32(len(ch.spans))) - //tr.pushChunk(ch) tr.SubmitChunk(ch) t.finished = 0 // important, because a buffer can be used for several flushes } // setPeerService sets the peer.service, _dd.peer.service.source, and _dd.peer.service.remapped_from // tags as applicable for the given span. -func setPeerService(s *Span, t Tracer) { - tc := t.TracerConf() +func setPeerService(s *Span, peerServiceDefaults bool, peerServiceMappings map[string]string) { if _, ok := s.meta[ext.PeerService]; ok { // peer.service already set on the span s.setMeta(keyPeerServiceSource, ext.PeerService) } else { // no peer.service currently set spanKind := s.meta[ext.SpanKind] isOutboundRequest := spanKind == ext.SpanKindClient || spanKind == ext.SpanKindProducer - shouldSetDefaultPeerService := isOutboundRequest && tc.PeerServiceDefaults + shouldSetDefaultPeerService := isOutboundRequest && peerServiceDefaults if !shouldSetDefaultPeerService { return } @@ -595,7 +592,7 @@ func setPeerService(s *Span, t Tracer) { } // Overwrite existing peer.service value if remapped by the user ps := s.meta[ext.PeerService] - if to, ok := tc.PeerServiceMappings[ps]; ok { + if to, ok := peerServiceMappings[ps]; ok { s.setMeta(keyPeerServiceRemappedFrom, ps) s.setMeta(ext.PeerService, to) } diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index c3c5e35cc8..3c106644e0 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -62,21 +62,23 @@ type Tracer interface { // Inject injects a span context into the given carrier. Inject(context *SpanContext, carrier interface{}) error - // Stop stops the tracer. Calls to Stop should be idempotent. - Stop() + // Submit submits a span to the tracer. + Submit(s *Span) + + // SubmitChunk submits a trace chunk to the tracer. + SubmitChunk(c *Chunk) - // TODO(kjn v2): These can be removed / consolidated. These are - // here temporarily as we figure out a sensible API. + // TracerConf returns a snapshot of the current configuration of the tracer. TracerConf() TracerConf - SubmitStats(*Span) - SubmitAbandonedSpan(*Span, bool) - SubmitChunk(any) // This is a horrible signature. This will eventually become SubmitChunk(Chunk) - Flush() // Synchronous flushing + // Flush flushes any buffered traces. Flush is in effect only if a tracer + // is started. Users do not have to call Flush in order to ensure that + // traces reach Datadog. It is a convenience method dedicated to specific + // use cases. + Flush() - // TODO(kjn v2): Not sure if this belongs in the tracer. - // May be better to have a separate stats counting package / type. - // Signal(Event) + // Stop stops the tracer. Calls to Stop should be idempotent. + Stop() } var _ Tracer = (*tracer)(nil) @@ -400,7 +402,7 @@ func (t *tracer) worker(tick <-chan time.Time) { select { case trace := <-t.out: t.sampleChunk(trace) - t.traceWriter.add(trace.Spans) + t.traceWriter.add(trace.spans) case <-tick: t.statsd.Incr("datadog.tracer.flush_triggered", []string{"reason:scheduled"}, 1) t.traceWriter.flush() @@ -423,7 +425,7 @@ func (t *tracer) worker(tick <-chan time.Time) { select { case trace := <-t.out: t.sampleChunk(trace) - t.traceWriter.add(trace.Spans) + t.traceWriter.add(trace.spans) default: break loop } @@ -433,19 +435,25 @@ func (t *tracer) worker(tick <-chan time.Time) { } } -// Chunk holds information about a trace Chunk to be flushed, including its spans. -// The Chunk may be a fully finished local trace Chunk, or only a portion of the local trace Chunk in the case of +// Chunk holds information about a trace chunk to be flushed, including its spans. +// The chunk may be a fully finished local trace chunk, or only a portion of the local trace chunk in the case of // partial flushing. type Chunk struct { - // TODO:(kjn v2) Should probably not be public, or be a different type. - Spans []*Span + spans []*Span willSend bool // willSend indicates whether the trace will be sent to the agent. } +func NewChunk(spans []*Span, willSend bool) *Chunk { + return &Chunk{ + spans: spans, + willSend: willSend, + } +} + // sampleChunk applies single-span sampling to the provided trace. func (t *tracer) sampleChunk(c *Chunk) { - if len(c.Spans) > 0 { - if p, ok := c.Spans[0].context.SamplingPriority(); ok && p > 0 { + if len(c.spans) > 0 { + if p, ok := c.spans[0].context.SamplingPriority(); ok && p > 0 { // The trace is kept, no need to run single span sampling rules. return } @@ -453,12 +461,12 @@ func (t *tracer) sampleChunk(c *Chunk) { var kept []*Span if t.rulesSampling.HasSpanRules() { // Apply sampling rules to individual spans in the trace. - for _, span := range c.Spans { + for _, span := range c.spans { if t.rulesSampling.SampleSpan(span) { kept = append(kept, span) } } - if len(kept) > 0 && len(kept) < len(c.Spans) { + if len(kept) > 0 && len(kept) < len(c.spans) { // Some spans in the trace were kept, so a partial trace will be sent. tracerstats.Signal(tracerstats.PartialTraces, 1) } @@ -466,22 +474,14 @@ func (t *tracer) sampleChunk(c *Chunk) { if len(kept) == 0 { tracerstats.Signal(tracerstats.DroppedP0Traces, 1) } - tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.Spans)-len(kept))) + tracerstats.Signal(tracerstats.DroppedP0Spans, uint32(len(c.spans)-len(kept))) if !c.willSend { - c.Spans = kept + c.spans = kept } } -func (t *tracer) SubmitChunk(c any) { - // TODO(kjn v2): This will be unified with pushChunk, and only one function will exist. - // This is here right now because of the lack of appropriate exported types. - - ch := c.(*Chunk) // TODO(kjn v2): This can panic. Once the appropriate types are moved, this assertion will be removed. - t.pushChunk(ch) -} - func (t *tracer) pushChunk(trace *Chunk) { - tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.Spans))) + tracerstats.Signal(tracerstats.SpansFinished, uint32(len(trace.spans))) select { case <-t.stop: return @@ -490,7 +490,7 @@ func (t *tracer) pushChunk(trace *Chunk) { select { case t.out <- trace: default: - log.Error("payload queue full, dropping %d traces", len(trace.Spans)) + log.Error("payload queue full, dropping %d traces", len(trace.spans)) } } @@ -776,7 +776,22 @@ func (t *tracer) TracerConf() TracerConf { } } -func (t *tracer) SubmitStats(s *Span) { +func (t *tracer) Submit(s *Span) { + tc := t.TracerConf() + if !tc.Disabled { + // we have an active tracer + if tc.CanComputeStats && shouldComputeStats(s) { + // the agent supports computed stats + t.submitStats(s) + } + if tc.DebugAbandonedSpans { + // the tracer supports debugging abandoned spans + t.submitAbandonedSpan(s, true) + } + } +} + +func (t *tracer) submitStats(s *Span) { select { case t.stats.In <- newAggregableSpan(s, t.obfuscator): // ok @@ -785,7 +800,7 @@ func (t *tracer) SubmitStats(s *Span) { } } -func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) { +func (t *tracer) submitAbandonedSpan(s *Span, finished bool) { select { case t.abandonedSpansDebugger.In <- newAbandonedSpanCandidate(s, finished): // ok @@ -794,6 +809,10 @@ func (t *tracer) SubmitAbandonedSpan(s *Span, finished bool) { } } +func (t *tracer) SubmitChunk(c *Chunk) { + t.pushChunk(c) +} + // sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent. const sampleRateMetricKey = "_sample_rate" diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 4ac3811eec..5a482e7f77 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -196,7 +196,7 @@ func TestTracerStart(t *testing.T) { Start() // ensure at least one worker started and handles requests - GetGlobalTracer().(*tracer).pushChunk(&Chunk{Spans: []*Span{}}) + GetGlobalTracer().(*tracer).pushChunk(&Chunk{spans: []*Span{}}) Stop() Stop() @@ -208,7 +208,7 @@ func TestTracerStart(t *testing.T) { tr, _, _, stop, err := startTestTracer(t) assert.Nil(t, err) defer stop() - tr.pushChunk(&Chunk{Spans: []*Span{}}) // blocks until worker is started + tr.pushChunk(&Chunk{spans: []*Span{}}) // blocks until worker is started select { case <-tr.stop: t.Fatal("stopped channel should be open") @@ -1615,16 +1615,16 @@ func TestPushTrace(t *testing.T) { resource: "/foo", }, } - tracer.pushChunk(&Chunk{Spans: trace}) + tracer.pushChunk(&Chunk{spans: trace}) assert.Len(tracer.out, 1) t0 := <-tracer.out - assert.Equal(&Chunk{Spans: trace}, t0) + assert.Equal(&Chunk{spans: trace}, t0) many := payloadQueueSize + 2 for i := 0; i < many; i++ { - tracer.pushChunk(&Chunk{Spans: make([]*Span, i)}) + tracer.pushChunk(&Chunk{spans: make([]*Span, i)}) } assert.Len(tracer.out, payloadQueueSize) log.Flush()