Skip to content

Commit

Permalink
Better bundle config and error management
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Oct 28, 2020
1 parent 402639d commit 099ee28
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 68 deletions.
25 changes: 25 additions & 0 deletions exporter/trace/cloudtrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ type options struct {
// Optional. Default value is 50.
BundleCountThreshold int

// BundleByteThreshold is the number of bytes that can be buffered before
// batch uploading them to the backend.
// Optional. Default value is 15KB.
BundleByteThreshold int

// BundleByteLimit is the maximum size of a bundle, in bytes. Zero means unlimited.
// Optional. Default value is unlimited.
BundleByteLimit int

// BufferMaxBytes is the maximum size (in bytes) of spans that
// will be buffered in memory before being dropped.
//
Expand Down Expand Up @@ -167,6 +176,22 @@ func WithBundleCountThreshold(bundleCountThreshold int) func(o *options) {
}
}

// WithBundleByteThreshold sets the number of bytes that can be buffered before
// batch uploading them to the backend.
func WithBundleByteThreshold(bundleByteThreshold int) func(o *options) {
return func(o *options) {
o.BundleByteThreshold = bundleByteThreshold
}
}

// WithBundleByteLimit sets the maximum size of a bundle, in bytes. Zero means
// unlimited.
func WithBundleByteLimit(bundleByteLimit int) func(o *options) {
return func(o *options) {
o.BundleByteLimit = bundleByteLimit
}
}

// WithBufferMaxBytes sets the maximum size (in bytes) of spans that will
// be buffered in memory before being dropped
func WithBufferMaxBytes(bufferMaxBytes int) func(o *options) {
Expand Down
121 changes: 87 additions & 34 deletions exporter/trace/cloudtrace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package trace_test
package trace

import (
"context"
"log"
"os"
"regexp"
"sync"
"testing"
Expand All @@ -26,8 +28,6 @@ import (
"github.com/googleinterns/cloud-operations-api-mock/cloudmock"
"github.com/stretchr/testify/assert"

texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"

"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -43,12 +43,12 @@ func TestExporter_ExportSpan(t *testing.T) {
clientOpt := []option.ClientOption{option.WithGRPCConn(mock.ClientConn())}

// Create Google Cloud Trace Exporter
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option{
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
_, flush, err := InstallNewPipeline(
[]Option{
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
WithBundleCountThreshold(1),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
Expand Down Expand Up @@ -82,12 +82,12 @@ func TestExporter_DisplayNameFormatter(t *testing.T) {
}

// Create Google Cloud Trace Exporter
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option{
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleCountThreshold(1),
texporter.WithDisplayNameFormatter(format),
_, flush, err := InstallNewPipeline(
[]Option{
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
WithBundleCountThreshold(1),
WithDisplayNameFormatter(format),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
Expand All @@ -112,14 +112,14 @@ func TestExporter_Timeout(t *testing.T) {
var exportErrors []error

// Create Google Cloud Trace Exporter
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option{
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithTimeout(1 * time.Millisecond),
_, flush, err := InstallNewPipeline(
[]Option{
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
WithTimeout(1 * time.Millisecond),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
texporter.WithOnError(func(err error) {
WithBundleCountThreshold(1),
WithOnError(func(err error) {
exportErrors = append(exportErrors, err)
}),
},
Expand Down Expand Up @@ -153,12 +153,12 @@ func TestBundling(t *testing.T) {
ch <- spans
})

_, _, err := texporter.InstallNewPipeline(
[]texporter.Option{
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(time.Second / 10),
texporter.WithBundleCountThreshold(10),
_, _, err := InstallNewPipeline(
[]Option{
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
WithBundleDelayThreshold(time.Second / 10),
WithBundleCountThreshold(10),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
Expand Down Expand Up @@ -212,13 +212,13 @@ func TestBundling_ConcurrentExports(t *testing.T) {
workers := 3
spansPerWorker := 50
delay := 2 * time.Second
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option{
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(delay),
texporter.WithBundleCountThreshold(spansPerWorker),
texporter.WithMaxNumberOfWorkers(workers),
_, flush, err := InstallNewPipeline(
[]Option{
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
WithBundleDelayThreshold(delay),
WithBundleCountThreshold(spansPerWorker),
WithMaxNumberOfWorkers(workers),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
Expand Down Expand Up @@ -265,3 +265,56 @@ func TestBundling_ConcurrentExports(t *testing.T) {

flush()
}

func TestBundling_Oversized(t *testing.T) {
mock := cloudmock.NewCloudMock()
defer mock.Shutdown()
clientOpt := []option.ClientOption{option.WithGRPCConn(mock.ClientConn())}

var uploaded bool
mock.SetOnUpload(func(ctx context.Context, spans []*tracepb.Span) { uploaded = true })

exp, err := NewExporter(
WithProjectID("PROJECT_ID_NOT_REAL"),
WithTraceClientOptions(clientOpt),
WithBundleByteLimit(1),
)
assert.NoError(t, err)
exp.traceExporter.overflowLogger.delayDur = 100 * time.Millisecond

tp := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exp),
)
global.SetTracerProvider(tp)

buf := &logBuffer{logInputChan: make(chan []byte, 100)}
log.SetOutput(buf)
defer func() {
log.SetOutput(os.Stderr)
}()

go func() {
for i := 0; i < 10; i++ {
_, span := global.TracerProvider().Tracer("test-tracer").Start(context.Background(), "test-span")
span.SetName("a")
span.End()
}
}()

if got, want := <-buf.logInputChan, regexp.MustCompile(`OpenTelemetry Cloud Trace exporter: failed to upload spans: oversized item: 10`); !want.Match([]byte(got)) {
t.Errorf("log: got %s, want %s", got, want)
}
if got, want := uploaded, false; got != want {
t.Errorf("uploaded: got %v, want %v", got, want)
}
}

type logBuffer struct {
logInputChan chan []byte
}

func (lb *logBuffer) Write(b []byte) (n int, err error) {
lb.logInputChan <- b
return len(b), nil
}
100 changes: 66 additions & 34 deletions exporter/trace/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,57 @@ type traceExporter struct {
client *traceclient.Client
}

const defaultBufferedByteLimit = 8 * 1024 * 1024
const defaultBundleDelayThreshold = 2 * time.Second
const defaultBundleCountThreshold = 50
const bundleByteThresholdMultiplier = 300
const bundleByteLimitMultiplier = 1000
const defaultBundleByteThreshold = 15000
const defaultBundleByteLimit = 0
const defaultBufferedByteLimit = 8 * 1024 * 1024

func newTraceExporter(o *options) (*traceExporter, error) {
client, err := traceclient.NewClient(o.Context, o.TraceClientOptions...)
if err != nil {
return nil, fmt.Errorf("stackdriver: couldn't initiate trace client: %v", err)
}
e := &traceExporter{
projectID: o.ProjectID,
client: client,
o: o,
projectID: o.ProjectID,
client: client,
o: o,
overflowLogger: overflowLogger{delayDur: 5 * time.Second},
}
b := bundler.NewBundler((*tracepb.Span)(nil), func(bundle interface{}) {
e.uploadFn(context.Background(), bundle.([]*tracepb.Span))
})

if o.BundleDelayThreshold > 0 {
b.DelayThreshold = o.BundleDelayThreshold
} else {
b.DelayThreshold = defaultBundleDelayThreshold
}

if o.BundleCountThreshold > 0 {
b.BundleCountThreshold = o.BundleCountThreshold
} else {
b.BundleCountThreshold = defaultBundleCountThreshold
}
b.BundleByteThreshold = b.BundleCountThreshold * bundleByteThresholdMultiplier
b.BundleByteLimit = b.BundleCountThreshold * bundleByteLimitMultiplier

if o.BundleByteThreshold > 0 {
b.BundleByteThreshold = o.BundleByteThreshold
} else {
b.BundleByteThreshold = defaultBundleByteThreshold
}

if o.BundleByteLimit > 0 {
b.BundleByteLimit = o.BundleByteLimit
} else {
b.BundleByteLimit = defaultBundleByteLimit
}

if o.BufferMaxBytes > 0 {
b.BufferedByteLimit = o.BufferMaxBytes
} else {
b.BufferedByteLimit = defaultBufferedByteLimit
}

if o.MaxNumberOfWorkers > 0 {
b.HandlerLimit = o.MaxNumberOfWorkers
}
Expand All @@ -91,9 +106,9 @@ func (e *traceExporter) checkBundlerError(err error) {
case nil:
return
case bundler.ErrOversizedItem:
fallthrough
e.overflowLogger.log(true)
case bundler.ErrOverflow:
e.overflowLogger.log()
e.overflowLogger.log(false)
default:
e.o.handleError(err)
}
Expand Down Expand Up @@ -144,38 +159,55 @@ func (e *traceExporter) Flush() {
// overflowLogger ensures that at most one overflow error log message is
// written every 5 seconds.
type overflowLogger struct {
mu sync.Mutex
pause bool
accum int
mu sync.Mutex
pause bool
delayDur time.Duration
bufferErrs int
oversized int
}

func (o *overflowLogger) log(oversized bool) {
o.mu.Lock()
defer o.mu.Unlock()
if !o.pause {
o.delay()
}

if oversized {
o.oversized++
} else {
o.bufferErrs++
}
}

func (o *overflowLogger) delay() {
o.pause = true
time.AfterFunc(5*time.Second, func() {
time.AfterFunc(o.delayDur, func() {
o.mu.Lock()
defer o.mu.Unlock()
switch {
case o.accum == 0:
o.pause = false
case o.accum == 1:
log.Println("OpenTelemetry Cloud Trace exporter: failed to upload span: buffer full")
o.accum = 0
o.delay()
default:
log.Printf("OpenTelemetry Cloud Trace exporter: failed to upload %d spans: buffer full", o.accum)
o.accum = 0
o.delay()
}
o.pause = false
logBufferErrors(o.bufferErrs, o.oversized)
o.bufferErrs = 0
o.oversized = 0
})
}

func (o *overflowLogger) log() {
o.mu.Lock()
defer o.mu.Unlock()
if !o.pause {
log.Println("OpenTelemetry Cloud Trace exporter: failed to upload span: buffer full")
o.delay()
} else {
o.accum++
func logBufferErrors(bufferFull, oversized int) {
if bufferFull == 0 && oversized == 0 {
return
}

var bufferFullMsg, oversizedItemMsg, separator string

if bufferFull > 0 {
bufferFullMsg = fmt.Sprintf("buffer full: %v", bufferFull)
}
if oversized > 0 {
oversizedItemMsg = fmt.Sprintf("oversized item: %v", oversized)
}
if bufferFull > 0 && oversized > 0 {
separator = ", "
}

log.Printf("OpenTelemetry Cloud Trace exporter: failed to upload spans: %s%s%s\n", bufferFullMsg, separator, oversizedItemMsg)
}

0 comments on commit 099ee28

Please sign in to comment.