Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
trace: add trace exporter. (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Jun 15, 2018
1 parent 8e26615 commit c75c27a
Show file tree
Hide file tree
Showing 13 changed files with 2,002 additions and 9 deletions.
48 changes: 40 additions & 8 deletions datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,63 @@ import (

"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)

const (
defaultEndpoint = "localhost:8125"
var (
_ view.Exporter = (*Exporter)(nil)
_ trace.Exporter = (*Exporter)(nil)
)

// Exporter exports stats to Datadog.
type Exporter struct {
statsExporter *statsExporter
*statsExporter
*traceExporter
}

// ExportView exports to Datadog if view data has one or more rows.
// ExportView implements view.Exporter.
func (e *Exporter) ExportView(vd *view.Data) {
if len(vd.Rows) == 0 {
return
}
e.statsExporter.addViewData(vd)
}

// ExportSpan implements trace.Exporter.
func (e *Exporter) ExportSpan(s *trace.SpanData) {
e.traceExporter.exportSpan(s)
}

// Stop cleanly stops the exporter, flushing any remaining spans to the transport and
// reporting any errors. Make sure to always call Stop at the end of your program in
// order to not lose any tracing data. Only call Stop once per exporter. Repeated calls
// will cause panic.
func (e *Exporter) Stop() {
e.traceExporter.stop()
}

// Options contains options for configuring the exporter.
type Options struct {
Namespace string // Namespace specifies the namespace to which metrics are appended.
StatsAddr string // Endpoint for DogStatsD
OnError func(err error) // OnError will be called in the case of an error while uploading the stats.
Tags []string // Tags specifies a set of global tags to attach to each metric.
// Namespace specifies the namespaces to which metric keys are appended.
Namespace string

// Service specifies the service name used for tracing.
Service string

// TraceAddr specifies the host[:port] address of the Datadog Trace Agent.
// It defaults to localhost:8126.
TraceAddr string

// StatsAddr specifies the host[:port] address for DogStatsD. It defaults
// to localhost:8125.
StatsAddr string

// OnError specifies a function that will be called if an error occurs during
// processing stats or metrics.
OnError func(err error)

// Tags specifies a set of global tags to attach to each metric.
Tags []string
}

func (o *Options) onError(err error) {
Expand All @@ -53,6 +84,7 @@ func (o *Options) onError(err error) {
func NewExporter(o Options) *Exporter {
return &Exporter{
statsExporter: newStatsExporter(o),
traceExporter: newTraceExporter(o),
}
}

Expand Down
163 changes: 163 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadog.com/).
// Copyright 2018 Datadog, Inc.

package datadog

import (
"errors"
"fmt"
"log"
"strings"
"sync"
"time"
)

const (
// defaultErrorLimit specifies the maximum number of occurrences that will
// be recorded for an error of a certain type.
defaultErrorLimit = 50

// defaultErrorFreq specifies the default frequency at which errors will
// be reported.
defaultErrorFreq = 5 * time.Second
)

// errorType specifies the error type.
type errorType int

const (
// errorTypeEncoding specifies that an encoding error has occurred.
errorTypeEncoding errorType = iota

// errorTypeOverflow specifies that the in channel capacity has been reached.
errorTypeOverflow

// errorTypeTransport specifies that an error occurred while trying
// to upload spans to the agent.
errorTypeTransport

// errorTypeUnknown specifies that an unknown error type was reported.
errorTypeUnknown
)

// errorTypeStrings maps error types to their human-readable description.
var errorTypeStrings = map[errorType]string{
errorTypeEncoding: "encoding error",
errorTypeOverflow: "span buffer overflow",
errorTypeTransport: "transport error",
errorTypeUnknown: "error",
}

// String implements fmt.Stringer.
func (et errorType) String() string { return errorTypeStrings[et] }

// errorAmortizer amortizes high frequency errors and condenses them into
// periodical reports to avoid flooding.
type errorAmortizer struct {
interval time.Duration // frequency of report
callback func(error) // error handler; defaults to log.Println

mu sync.RWMutex // guards below fields
pausing bool
errs map[errorType]*aggregateError
}

// newErrorAmortizer creates a new errorAmortizer which calls the provided function
// at the given interval, passing it a detailed error report if one has occurred.
func newErrorAmortizer(interval time.Duration, cb func(error)) *errorAmortizer {
if cb == nil {
cb = func(err error) {
log.Println(err)
}
}
return &errorAmortizer{
interval: interval,
callback: cb,
errs: make(map[errorType]*aggregateError),
}
}

// flush flushes any aggregated errors and resets the amortizer.
func (e *errorAmortizer) flush() {
e.mu.Lock()
defer e.mu.Unlock()
n := len(e.errs)
if n == 0 {
return
}
var str strings.Builder
str.WriteString("Datadog Exporter error: ")
for _, err := range e.errs {
if n > 1 {
str.WriteString("\n\t")
}
str.WriteString(err.Error())
}
e.callback(errors.New(str.String()))
e.errs = make(map[errorType]*aggregateError)
e.pausing = false
}

// limitReached returns true if the defaultErrorLimit has been reached
// for the given error type.
func (e *errorAmortizer) limitReached(typ errorType) bool {
e.mu.RLock()
defer e.mu.RUnlock()
return e.errs[typ] != nil && e.errs[typ].num > defaultErrorLimit-1
}

// log logs an error of the given type, having the given message. err
// is optional and can be nil.
func (e *errorAmortizer) log(typ errorType, err error) {
if e.limitReached(typ) {
// avoid too much lock contention
return
}
e.mu.Lock()
defer e.mu.Unlock()
if _, ok := e.errs[typ]; !ok {
e.errs[typ] = newError(typ, err)
} else {
e.errs[typ].num++
}
if !e.pausing {
e.pausing = true
time.AfterFunc(e.interval, e.flush)
}
}

var _ error = (*aggregateError)(nil)

// aggregateError is an error consisting of a type and an optional context
// error. It is used to aggregate errors inside the errorAmortizer.
type aggregateError struct {
typ errorType // error type
err error // error message (optional)
num int // number of occurrences
}

// newError creates a new aggregateError.
func newError(t errorType, err error) *aggregateError {
return &aggregateError{t, err, 1}
}

// Error implements the error interface. If the error occurred more than
// once, it appends the number of occurrences to the error message.
func (e *aggregateError) Error() string {
var str strings.Builder
if e.err == nil {
str.WriteString(e.typ.String())
} else {
// no need to include the type into the message, it will be evident
// from the message itself.
str.WriteString(e.err.Error())
}
if e.num >= defaultErrorLimit {
str.WriteString(fmt.Sprintf(" (x%d+)", defaultErrorLimit))
} else if e.num > 1 {
str.WriteString(fmt.Sprintf(" (x%d)", e.num))
}
return str.String()
}
133 changes: 133 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadog.com/).
// Copyright 2018 Datadog, Inc.

package datadog

import (
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"
)

const waitTime = 10 * time.Millisecond

func containsFunc(t *testing.T) func(a error, b string) {
return func(a error, b string) {
if !strings.Contains(a.Error(), b) {
t.Fatalf("%q did not contain %q", a.Error(), b)
}
}
}

func TestErrorAmortizer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

t.Run("same", func(t *testing.T) {
ma := newTestErrorAmortizer()
for i := 0; i < 10; i++ {
ma.log(errorTypeOverflow, errors.New("buffer full"))
}
time.Sleep(waitTime + 10*time.Millisecond)
out := ma.lastError()
if out == nil {
t.Fatal("no error")
}
contains := containsFunc(t)
contains(out, "Datadog Exporter error:")
contains(out, "(x10)")
contains(out, "buffer full")
})

t.Run("contention", func(t *testing.T) {
ma := newTestErrorAmortizer()
for i := 0; i < defaultErrorLimit+10; i++ {
ma.log(errorTypeOverflow, nil)
}
time.Sleep(waitTime + 10*time.Millisecond)
out := ma.lastError()
if out == nil {
t.Fatal("no error")
}
containsFunc(t)(out, fmt.Sprintf("Datadog Exporter error: span buffer overflow (x%d+)", defaultErrorLimit))
})

t.Run("various", func(t *testing.T) {
ma := newTestErrorAmortizer()
for j := 0; j < 2; j++ {
ma.reset()
for i := 0; i < 2; i++ {
ma.log(errorTypeOverflow, nil)
}
for i := 0; i < 5; i++ {
ma.log(errorTypeTransport, errors.New("transport failed"))
}
for i := 0; i < 3; i++ {
ma.log(errorTypeEncoding, errors.New("encoding error"))
}
ma.log(errorTypeUnknown, errors.New("unknown error"))
time.Sleep(waitTime + 10*time.Millisecond)
out := ma.lastError()
if out == nil {
t.Fatal("no error")
}
contains := containsFunc(t)
contains(out, "Datadog Exporter error:")
contains(out, "span buffer overflow (x2)")
contains(out, "transport failed (x5)")
contains(out, "encoding error (x3)")
contains(out, "unknown error")
}
})

t.Run("one", func(t *testing.T) {
ma := newTestErrorAmortizer()
ma.log(errorTypeUnknown, errors.New("some error"))
time.Sleep(waitTime + 10*time.Millisecond)
out := ma.lastError()
if out == nil {
t.Fatal("no error")
}
contains := containsFunc(t)
contains(out, "Datadog Exporter error:")
contains(out, "some error")
})
}

type testErrorAmortizer struct {
*errorAmortizer

mu sync.RWMutex // guards lastErr
lastErr error
}

func (ma *testErrorAmortizer) lastError() error {
ma.mu.RLock()
defer ma.mu.RUnlock()
return ma.lastErr
}

func (ma *testErrorAmortizer) reset() {
ma.mu.RLock()
defer ma.mu.RUnlock()
ma.lastErr = nil
}

func (ma *testErrorAmortizer) captureError(err error) {
ma.mu.Lock()
defer ma.mu.Unlock()
ma.lastErr = err
}

func newTestErrorAmortizer() *testErrorAmortizer {
ea := newErrorAmortizer(waitTime, nil)
ma := &testErrorAmortizer{errorAmortizer: ea}
ma.errorAmortizer.callback = ma.captureError
return ma
}
Loading

0 comments on commit c75c27a

Please sign in to comment.