Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

[sampling] add distributed tracing capabilities #310

Merged
merged 3 commits into from Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 70 additions & 27 deletions agent/agent.go
Expand Up @@ -15,8 +15,11 @@ import (
"github.com/DataDog/datadog-trace-agent/watchdog"
)

const processStatsInterval = time.Minute
const languageHeaderKey = "X-Datadog-Reported-Languages"
const (
processStatsInterval = time.Minute
languageHeaderKey = "X-Datadog-Reported-Languages"
samplingPriorityKey = "_sampling_priority_v1"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for v1, private and for internal usage until we have OT support, with the final sampling.priority tag. Also, this is a metrics for now.

)

type processedTrace struct {
Trace model.Trace
Expand All @@ -34,14 +37,16 @@ func (pt *processedTrace) weight() float64 {

// Agent struct holds all the sub-routines structs and make the data flow between them
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
Sampler *Sampler
Writer *Writer
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
ScoreEngine *Sampler
PriorityEngine *Sampler
Writer *Writer

// config
conf *config.AgentConfig
conf *config.AgentConfig
dynConf *config.DynamicConfig

// Used to synchronize on a clean exit
exit chan struct{}
Expand All @@ -53,26 +58,34 @@ type Agent struct {
func NewAgent(conf *config.AgentConfig) *Agent {
exit := make(chan struct{})

r := NewHTTPReceiver(conf)
dynConf := config.NewDynamicConfig()
r := NewHTTPReceiver(conf, dynConf)
c := NewConcentrator(
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
)
f := filters.Setup(conf)
s := NewSampler(conf)
ss := NewScoreEngine(conf)
var ps *Sampler
if conf.PrioritySampling {
// Use priority sampling for distributed tracing only if conf says so
ps = NewPriorityEngine(conf, dynConf)
}

w := NewWriter(conf)
w.inServices = r.services

return &Agent{
Receiver: r,
Concentrator: c,
Filters: f,
Sampler: s,
Writer: w,
conf: conf,
exit: exit,
die: die,
Receiver: r,
Concentrator: c,
Filters: f,
ScoreEngine: ss,
PriorityEngine: ps,
Writer: w,
conf: conf,
dynConf: dynConf,
exit: exit,
die: die,
}
}

Expand All @@ -92,7 +105,10 @@ func (a *Agent) Run() {

a.Receiver.Run()
a.Writer.Run()
a.Sampler.Run()
a.ScoreEngine.Run()
if a.PriorityEngine != nil {
a.PriorityEngine.Run()
}

for {
select {
Expand All @@ -112,7 +128,13 @@ func (a *Agent) Run() {
}()
go func() {
defer watchdog.LogOnPanic()
p.Traces = a.Sampler.Flush()
// Serializing both flushes, classic agent sampler and distributed sampler,
// in most cases only one will be used, so in mainstream case there should
// be no performance issue, only in transitionnal mode can both contain data.
p.Traces = a.ScoreEngine.Flush()
if a.PriorityEngine != nil {
p.Traces = append(p.Traces, a.PriorityEngine.Flush()...)
}
wg.Done()
}()

Expand All @@ -126,14 +148,17 @@ func (a *Agent) Run() {
log.Info("exiting")
close(a.Receiver.exit)
a.Writer.Stop()
a.Sampler.Stop()
a.ScoreEngine.Stop()
if a.PriorityEngine != nil {
a.PriorityEngine.Stop()
}
return
}
}
}

// Process is the default work unit that receives a trace, transforms it and
// passes it downstream
// passes it downstream.
func (a *Agent) Process(t model.Trace) {
if len(t) == 0 {
// XXX Should never happen since we reject empty traces during
Expand All @@ -143,12 +168,31 @@ func (a *Agent) Process(t model.Trace) {
}

root := t.GetRoot()

// We get the address of the struct holding the stats associated to no tags
ts := a.Receiver.stats.getTagStats(Tags{})

// We choose the sampler dynamically, depending on trace content,
// it has a sampling priority info (wether 0 or 1 or more) we respect
// this by using priority sampler. Else, use default score sampler.
s := a.ScoreEngine
priorityPtr := &ts.TracesPriorityNone
if a.PriorityEngine != nil {
if priority, ok := root.Metrics[samplingPriorityKey]; ok {
s = a.PriorityEngine

if priority == 0 {
priorityPtr = &ts.TracesPriority0
} else {
priorityPtr = &ts.TracesPriority1
}
}
}
atomic.AddInt64(priorityPtr, 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here this looks redundant with received traces, but not quite, actually traces can be dropped for several reasons , some because they were not normalisable (this would be done before) some because they are outdated (this would be done after). All in all, having this metric (which is then exposed with tags depending on wether priority is none, 0, or non-zero) is very useful to inspect what is going on.


if root.End() < model.Now()-2*a.conf.BucketInterval.Nanoseconds() {
log.Errorf("skipping trace with root too far in past, root:%v", *root)

// We get the address of the struct holding the stats associated to the tags
ts := a.Receiver.stats.getTagStats(Tags{})

atomic.AddInt64(&ts.TracesDropped, 1)
atomic.AddInt64(&ts.SpansDropped, int64(len(t)))
return
Expand All @@ -160,7 +204,6 @@ func (a *Agent) Process(t model.Trace) {
}

log.Debugf("rejecting trace by filter: %T %v", f, *root)
ts := a.Receiver.stats.getTagStats(Tags{})
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))

Expand Down Expand Up @@ -201,7 +244,7 @@ func (a *Agent) Process(t model.Trace) {
}()
go func() {
defer watchdog.LogOnPanic()
a.Sampler.Add(pt)
s.Add(pt)
}()
}

Expand Down