Skip to content

Commit

Permalink
trace: use priority sampler and /v0.4 endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Dec 28, 2018
1 parent 87095c5 commit a786a57
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 35 deletions.
2 changes: 1 addition & 1 deletion sampler.go
Expand Up @@ -75,7 +75,7 @@ func (ps *prioritySampler) getRate(spn *ddSpan) float64 {

// apply applies sampling priority to the given span. Caller must ensure it is safe
// to modify the span.
func (ps *prioritySampler) apply(spn *ddSpan) {
func (ps *prioritySampler) applyPriority(spn *ddSpan) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.Metrics[samplingPriorityKey] = ext.PriorityAutoKeep
Expand Down
4 changes: 2 additions & 2 deletions sampler_test.go
Expand Up @@ -150,12 +150,12 @@ func TestPrioritySampler(t *testing.T) {
testSpan1.Service = "obfuscate.http"
testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 4)

ps.apply(testSpan1)
ps.applyPriority(testSpan1)
assert.EqualValues(ext.PriorityAutoKeep, testSpan1.Metrics[samplingPriorityKey])
assert.EqualValues(0.5, testSpan1.Metrics[samplingPriorityRateKey])

testSpan1.TraceID = math.MaxUint64 - (math.MaxUint64 / 3)
ps.apply(testSpan1)
ps.applyPriority(testSpan1)
assert.EqualValues(ext.PriorityAutoReject, testSpan1.Metrics[samplingPriorityKey])
assert.EqualValues(0.5, testSpan1.Metrics[samplingPriorityRateKey])

Expand Down
2 changes: 1 addition & 1 deletion span.go
Expand Up @@ -54,7 +54,7 @@ func (e *traceExporter) convertSpan(s *trace.SpanData) *ddSpan {
Service: e.opts.Service,
Start: startNano,
Duration: s.EndTime.UnixNano() - startNano,
Metrics: map[string]float64{samplingPriorityKey: ext.PriorityAutoKeep},
Metrics: map[string]float64{},
Meta: map[string]string{},
}
if s.ParentSpanID != (trace.SpanID{}) {
Expand Down
23 changes: 8 additions & 15 deletions span_test.go
Expand Up @@ -53,11 +53,8 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
"int64": 1,
samplingPriorityKey: ext.PriorityAutoKeep,
},
Service: "my-service",
Metrics: map[string]float64{"int64": 1},
Service: "my-service",
Meta: map[string]string{
"bool": "true",
"str": "abc",
Expand Down Expand Up @@ -89,11 +86,9 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
samplingPriorityKey: ext.PriorityAutoKeep,
},
Service: "my-service",
Meta: map[string]string{},
Metrics: map[string]float64{},
Service: "my-service",
Meta: map[string]string{},
},
},
"error": {
Expand Down Expand Up @@ -121,11 +116,9 @@ var spanPairs = map[string]struct {
Resource: "/a/b",
Start: testStartTime.UnixNano(),
Duration: testEndTime.UnixNano() - testStartTime.UnixNano(),
Metrics: map[string]float64{
samplingPriorityKey: ext.PriorityAutoKeep,
},
Error: 1,
Service: "my-service",
Metrics: map[string]float64{},
Error: 1,
Service: "my-service",
Meta: map[string]string{
ext.ErrorMsg: "status-msg",
ext.ErrorType: "cancelled",
Expand Down
14 changes: 12 additions & 2 deletions trace.go
Expand Up @@ -7,6 +7,7 @@ package datadog

import (
"bytes"
"io"
"sync"
"time"

Expand Down Expand Up @@ -43,10 +44,11 @@ type traceExporter struct {
opts Options
payload *payload
errors *errorAmortizer
sampler *prioritySampler

// uploadFn specifies the function used for uploading.
// Defaults to (*transport).upload; replaced in tests.
uploadFn func(pkg *bytes.Buffer, count int) error
uploadFn func(pkg *bytes.Buffer, count int) (io.ReadCloser, error)

wg sync.WaitGroup // counts active uploads
in chan *ddSpan
Expand All @@ -57,10 +59,12 @@ func newTraceExporter(o Options) *traceExporter {
if o.Service == "" {
o.Service = defaultService
}
sampler := newPrioritySampler()
e := &traceExporter{
opts: o,
payload: newPayload(),
errors: newErrorAmortizer(defaultErrorFreq, o.OnError),
sampler: sampler,
uploadFn: newTransport(o.TraceAddr).upload,
in: make(chan *ddSpan, inChannelSize),
exit: make(chan struct{}),
Expand All @@ -79,6 +83,9 @@ func (e *traceExporter) loop() {
for {
select {
case span := <-e.in:
if _, ok := span.Metrics[samplingPriorityKey]; !ok {
e.sampler.applyPriority(span)
}
if err := e.payload.add(span); err != nil {
e.errors.log(errorTypeEncoding, err)
}
Expand Down Expand Up @@ -114,8 +121,11 @@ func (e *traceExporter) flush() {
buf := e.payload.buffer()
e.wg.Add(1)
go func() {
if err := e.uploadFn(buf, n); err != nil {
body, err := e.uploadFn(buf, n)
if err != nil {
e.errors.log(errorTypeTransport, err)
} else {
e.sampler.readRatesJSON(body) // do we care about errors?
}
e.wg.Done()
}()
Expand Down
37 changes: 35 additions & 2 deletions trace_test.go
Expand Up @@ -7,12 +7,16 @@ package datadog

import (
"bytes"
"io"
"io/ioutil"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/tinylib/msgp/msgp"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)

const (
Expand Down Expand Up @@ -82,6 +86,35 @@ func TestTraceExporter(t *testing.T) {
t.Fatalf("expected to flush 1, got %d", len(me.payloads()))
}
})

t.Run("sampler", func(t *testing.T) {
eq := equalFunc(t)
me := newTestTraceExporter(t)
me.exportSpan(spanPairs["error"].oc)

time.Sleep(time.Millisecond) // wait for recv
me.stop()

// sampler is updated after flush
eq(me.sampler.rates["service:db.users,env:"], 0.9)
eq(me.sampler.defaultRate, 0.8)

// got the sent span
payload := me.payloads()
eq(len(payload), 1)
eq(len(payload[0]), 1)
eq(len(payload[0][0]), 1)

// span has sampling priority and rate applied
span1 := payload[0][0][0]
p, ok := span1.Metrics[samplingPriorityKey]
if !ok || !(p == ext.PriorityAutoKeep || p == ext.PriorityAutoReject) {
t.Fatal(p, ok)
}
if v := span1.Metrics[samplingPriorityRateKey]; v != 1 {
t.Fatalf("got %f", v)
}
})
}

// testTraceExporter wraps a traceExporter, recording all flushed payloads.
Expand All @@ -107,13 +140,13 @@ func (me *testTraceExporter) payloads() []ddPayload {
return me.flushed
}

func (me *testTraceExporter) uploadFn(buf *bytes.Buffer, _ int) error {
func (me *testTraceExporter) uploadFn(buf *bytes.Buffer, _ int) (io.ReadCloser, error) {
var ddp ddPayload
if err := msgp.Decode(buf, &ddp); err != nil {
me.t.Fatal(err)
}
me.mu.Lock()
me.flushed = append(me.flushed, ddp)
me.mu.Unlock()
return nil
return ioutil.NopCloser(strings.NewReader(`{"rate_by_service":{"service:,env:":0.8,"service:db.users,env:":0.9}}`)), nil
}
23 changes: 12 additions & 11 deletions transport.go
Expand Up @@ -8,6 +8,7 @@ package datadog
import (
"bytes"
"fmt"
"io"
"net"
"net/http"
"runtime"
Expand Down Expand Up @@ -53,7 +54,7 @@ func newTransport(addr string) *transport {
Timeout: 1 * time.Second,
}
return &transport{
url: fmt.Sprintf("http://%s/v0.3/traces", addr),
url: fmt.Sprintf("http://%s/v0.4/traces", addr),
client: httpclient,
}
}
Expand All @@ -69,32 +70,32 @@ var httpHeaders = map[string]string{
}

// upload sents the given request body to the Datadog agent and assigns the traceCount
// as an HTTP header.
func (t *transport) upload(body *bytes.Buffer, traceCount int) error {
req, err := http.NewRequest("POST", t.url, body)
// as an HTTP header. It returns a non-nil body if it was successful.
func (t *transport) upload(data *bytes.Buffer, traceCount int) (body io.ReadCloser, err error) {
req, err := http.NewRequest("POST", t.url, data)
if err != nil {
return fmt.Errorf("cannot create http request: %v", err)
return nil, fmt.Errorf("cannot create http request: %v", err)
}
for header, value := range httpHeaders {
req.Header.Set(header, value)
}
req.Header.Set("X-Datadog-Trace-Count", strconv.Itoa(traceCount))
req.Header.Set("Content-Length", strconv.Itoa(body.Len()))
req.Header.Set("Content-Length", strconv.Itoa(data.Len()))
response, err := t.client.Do(req)
if err != nil {
return err
return nil, err
}
defer response.Body.Close()
if code := response.StatusCode; code >= 400 {
// error, check the body for context information and
// return a user friendly error
msg := make([]byte, 1000)
n, _ := response.Body.Read(msg)
response.Body.Close()
txt := http.StatusText(code)
if n > 0 {
return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
return nil, fmt.Errorf("%s (Status: %s)", msg[:n], txt)
}
return fmt.Errorf("%s", txt)
return nil, fmt.Errorf("%s", txt)
}
return nil
return response.Body, nil
}
6 changes: 5 additions & 1 deletion transport_test.go
Expand Up @@ -29,7 +29,11 @@ func TestTransport(t *testing.T) {
p.add(span)
}
trans := newTransport("")
err := trans.upload(p.buffer(), len(p.traces))
body, err := trans.upload(p.buffer(), len(p.traces))
if err != nil {
t.Fatal(err)
}
err = newPrioritySampler().readRatesJSON(body)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit a786a57

Please sign in to comment.