Skip to content

Commit

Permalink
Merge branch 'main' into shevchenko/remove-agent_psr-when-rule-sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
dianashevchenko committed Apr 30, 2024
2 parents eeffc6e + 155ef2d commit 6fa2b0a
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 100 deletions.
6 changes: 4 additions & 2 deletions contrib/database/sql/example_test.go
Expand Up @@ -108,14 +108,16 @@ func Example_dbmPropagation() {
}

func Example_dbStats() {
sqltrace.Register("postgres", &pq.Driver{})
// Register the driver with the WithDBStats option to enable DBStats metric polling
sqltrace.Register("postgres", &pq.Driver{}, sqltrace.WithDBStats())
// Followed by a call to Open.
db, err := sqltrace.Open("postgres", "postgres://pqgotest:password@localhost/pqgotest?sslmode=disable")

if err != nil {
log.Fatal(err)
}

// Tracing is now enabled. Continue to use the database/sql package as usual
// Tracing and metric polling is now enabled. Metrics will be submitted to Datadog with the prefix `datadog.tracer.sql`
rows, err := db.Query("SELECT name FROM users WHERE age=?", 27)
if err != nil {
log.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/opentelemetry/tracer_test.go
Expand Up @@ -116,7 +116,7 @@ func TestSpanContext(t *testing.T) {
assert.Equal(oteltrace.FlagsSampled, sctx.TraceFlags())
assert.Equal("000000000000000000000000075bcd15", sctx.TraceID().String())
assert.Equal("0000000000000010", sctx.SpanID().String())
assert.Equal("dd=s:2;o:rum;t.usr.id:baz64~~", sctx.TraceState().String())
assert.Equal("dd=s:2;o:rum;p:0000000000000010;t.usr.id:baz64~~", sctx.TraceState().String())
assert.Equal(true, sctx.IsRemote())
}

Expand Down
79 changes: 61 additions & 18 deletions ddtrace/tracer/remote_config.go
Expand Up @@ -9,6 +9,8 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
Expand Down Expand Up @@ -78,24 +80,6 @@ func (t *tags) toMap() *map[string]interface{} {
return &m
}

func (t *tracer) dynamicInstrumentationRCUpdate(u remoteconfig.ProductUpdate) map[string]state.ApplyStatus {
applyStatus := map[string]state.ApplyStatus{}

for k, v := range u {
log.Debug("Received dynamic instrumentation RC configuration for %s\n", k)
applyStatus[k] = state.ApplyStatus{State: state.ApplyStateUnknown}
passFullConfiguration(globalconfig.RuntimeID(), k, string(v))
}

return applyStatus
}

// passFullConfiguration is used as a stable interface to find the configuration in via bpf. Go-DI attaches
// a bpf program to this function and extracts the raw bytes accordingly.
//
//go:noinline
func passFullConfiguration(_, _, _ string) {}

// onRemoteConfigUpdate is a remote config callaback responsible for processing APM_TRACING RC-product updates.
func (t *tracer) onRemoteConfigUpdate(u remoteconfig.ProductUpdate) map[string]state.ApplyStatus {
statuses := map[string]state.ApplyStatus{}
Expand Down Expand Up @@ -200,6 +184,63 @@ func (t *tracer) onRemoteConfigUpdate(u remoteconfig.ProductUpdate) map[string]s
return statuses
}

type dynamicInstrumentationRCProbeConfig struct {
runtimeID string
configPath string
configContent string
}

type dynamicInstrumentationRCState struct {
sync.Mutex
state map[string]dynamicInstrumentationRCProbeConfig
}

var (
diRCState dynamicInstrumentationRCState
initalizeRC sync.Once
)

func (t *tracer) dynamicInstrumentationRCUpdate(u remoteconfig.ProductUpdate) map[string]state.ApplyStatus {
applyStatus := map[string]state.ApplyStatus{}

diRCState.Lock()
for k, v := range u {
log.Debug("Received dynamic instrumentation RC configuration for %s\n", k)
applyStatus[k] = state.ApplyStatus{State: state.ApplyStateUnknown}
diRCState.state[k] = dynamicInstrumentationRCProbeConfig{
runtimeID: globalconfig.RuntimeID(),
configPath: k,
configContent: string(v),
}
}
diRCState.Unlock()
return applyStatus
}

// passProbeConfiguration is used as a stable interface to find the configuration in via bpf. Go-DI attaches
// a bpf program to this function and extracts the raw bytes accordingly.
//
//nolint:all
//go:noinline
func passProbeConfiguration(runtimeID, configPath, configContent string) {}

func initalizeDynamicInstrumentationRemoteConfigState() {
diRCState = dynamicInstrumentationRCState{
state: map[string]dynamicInstrumentationRCProbeConfig{},
}

go func() {
for {
time.Sleep(time.Second * 5)
diRCState.Lock()
for _, v := range diRCState.state {
passProbeConfiguration(v.runtimeID, v.configPath, v.configContent)
}
diRCState.Unlock()
}
}()
}

// startRemoteConfig starts the remote config client.
// It registers the APM_TRACING product with a callback,
// and the LIVE_DEBUGGING product without a callback.
Expand All @@ -215,6 +256,8 @@ func (t *tracer) startRemoteConfig(rcConfig remoteconfig.ClientConfig) error {
dynamicInstrumentationError = remoteconfig.Subscribe("LIVE_DEBUGGING", t.dynamicInstrumentationRCUpdate)
}

initalizeRC.Do(initalizeDynamicInstrumentationRemoteConfigState)

apmTracingError = remoteconfig.Subscribe(
state.ProductAPMTracing,
t.onRemoteConfigUpdate,
Expand Down
1 change: 1 addition & 0 deletions ddtrace/tracer/span.go
Expand Up @@ -737,6 +737,7 @@ const (
keyDecisionMaker = "_dd.p.dm"
keyServiceHash = "_dd.dm.service_hash"
keyOrigin = "_dd.origin"
keyReparentID = "_dd.parent_id"
// keyHostname can be used to override the agent's hostname detection when using `WithHostname`. Not to be confused with keyTracerHostname
// which is set via auto-detection.
keyHostname = "_dd.hostname"
Expand Down
12 changes: 12 additions & 0 deletions ddtrace/tracer/spancontext.go
Expand Up @@ -91,6 +91,17 @@ type spanContext struct {
span *span // reference to the span that hosts this context
errors int32 // number of spans with errors in this trace

// The 16-character hex string of the last seen Datadog Span ID
// this value will be added as the _dd.parent_id tag to spans
// created from this spanContext.
// This value is extracted from the `p` sub-key within the tracestate.
// The backend will use the _dd.parent_id tag to reparent spans in
// distributed traces if they were missing their parent span.
// Missing parent span could occur when a W3C-compliant tracer
// propagated this context, but didn't send any spans to Datadog.
reparentID string
isRemote bool

// the below group should propagate cross-process

traceID traceID
Expand All @@ -112,6 +123,7 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
spanID: span.SpanID,
span: span,
}

context.traceID.SetLower(span.TraceID)
if parent != nil {
context.traceID.SetUpper(parent.traceID.Upper())
Expand Down
29 changes: 29 additions & 0 deletions ddtrace/tracer/textmap.go
Expand Up @@ -327,6 +327,8 @@ func (p *propagatorW3c) propagateTracestate(ctx *spanContext, carrier interface{
ts := w3cCtx.(*spanContext).trace.propagatingTag(tracestateHeader)
priority, _ := ctx.SamplingPriority()
setPropagatingTag(ctx, tracestateHeader, composeTracestate(ctx, priority, ts))
ctx.reparentID = w3cCtx.(*spanContext).reparentID
ctx.isRemote = (w3cCtx.(*spanContext).isRemote)
}

// propagator implements Propagator and injects/extracts span contexts
Expand Down Expand Up @@ -750,13 +752,17 @@ func (*propagatorW3c) injectTextMap(spanCtx ddtrace.SpanContext, writer TextMapW
}
writer.Set(traceparentHeader, fmt.Sprintf("00-%s-%016x-%v", traceID, ctx.spanID, flags))
// if context priority / origin / tags were updated after extraction,
// or if there is a span on the trace
// or the tracestateHeader doesn't start with `dd=`
// we need to recreate tracestate
if ctx.updated ||
(!ctx.isRemote || ctx.isRemote && ctx.trace != nil && ctx.trace.root != nil) ||
(ctx.trace != nil && !strings.HasPrefix(ctx.trace.propagatingTag(tracestateHeader), "dd=")) ||
ctx.trace.propagatingTagsLen() == 0 {
// compose a new value for the tracestate
writer.Set(tracestateHeader, composeTracestate(ctx, p, ctx.trace.propagatingTag(tracestateHeader)))
} else {
// use a cached value for the tracestate (e.g., no updating p: key)
writer.Set(tracestateHeader, ctx.trace.propagatingTag(tracestateHeader))
}
return nil
Expand Down Expand Up @@ -819,6 +825,7 @@ func isValidID(id string) bool {
// composeTracestate creates a tracestateHeader from the spancontext.
// The Datadog tracing library is only responsible for managing the list member with key dd,
// which holds the values of the sampling decision(`s:<value>`), origin(`o:<origin>`),
// the last parent ID of a Datadog span (`p:<parent_id>`),
// and propagated tags prefixed with `t.`(e.g. _dd.p.usr.id:usr_id tag will become `t.usr.id:usr_id`).
func composeTracestate(ctx *spanContext, priority int, oldState string) string {
var b strings.Builder
Expand All @@ -832,6 +839,20 @@ func composeTracestate(ctx *spanContext, priority int, oldState string) string {
strings.ReplaceAll(oWithSub, "=", "~")))
}

// if the context is remote and there is a reparentID, set p as reparentId
// if the context is remote and there is no reparentID, don't set p
// if the context is not remote, set p as context.spanId
// this ID can be used by downstream tracers to set a _dd.parent_id tag
// to allow the backend to reparent orphaned spans if necessary

if ctx.isRemote && ctx.reparentID != "" {
b.WriteString(fmt.Sprintf(";p:%s", ctx.reparentID))
}

if !ctx.isRemote || ctx.isRemote && ctx.trace.root != nil {
b.WriteString(fmt.Sprintf(";p:%016x", ctx.spanID))
}

ctx.trace.iteratePropagatingTags(func(k, v string) bool {
if !strings.HasPrefix(k, "_dd.p.") {
return true
Expand Down Expand Up @@ -880,6 +901,7 @@ func (*propagatorW3c) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
var parentHeader string
var stateHeader string
var ctx spanContext
ctx.isRemote = true
// to avoid parsing tracestate header(s) if traceparent is invalid
if err := reader.ForeachKey(func(k, v string) error {
key := strings.ToLower(k)
Expand Down Expand Up @@ -995,6 +1017,7 @@ func parseTraceparent(ctx *spanContext, header string) error {
// The keys to the “dd“ values have been shortened as follows to save space:
// `sampling_priority` = `s`
// `origin` = `o`
// `last parent` = `p`
// `_dd.p.` prefix = `t.`
func parseTracestate(ctx *spanContext, header string) {
if header == "" {
Expand All @@ -1011,6 +1034,7 @@ func parseTracestate(ctx *spanContext, header string) {
}
ddMembers := strings.Split(group[len("dd="):], ";")
dropDM := false
// indicate that backend could reparent this as a root
for _, member := range ddMembers {
keyVal := strings.SplitN(member, ":", 2)
if len(keyVal) != 2 {
Expand Down Expand Up @@ -1044,6 +1068,11 @@ func parseTracestate(ctx *spanContext, header string) {
ctx.setSamplingPriority(0, samplernames.Unknown)
dropDM = true
}
} else if key == "p" {
if val != "" {
ctx.reparentID = val
}

} else if strings.HasPrefix(key, "t.dm") {
if ctx.trace.hasPropagatingTag(keyDecisionMaker) || dropDM {
continue
Expand Down

0 comments on commit 6fa2b0a

Please sign in to comment.