Skip to content

Commit

Permalink
Remove bundler in favor of BatchSpanProcessor + to comply w/ Specific…
Browse files Browse the repository at this point in the history
…ation (#149)

* Remove bundler and assocaited config.

* Update install methods to use built-in batch processor and update them to use the (only available) shutdown method instead of flush

* Add configuration options for batch span processor to NewExportPipeline method.

* Fixes from review.
  • Loading branch information
jsuereth committed Mar 5, 2021
1 parent 6aca59c commit f288a1b
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 380 deletions.
8 changes: 4 additions & 4 deletions example/trace/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func initTracer() func() {

// Create Google Cloud Trace exporter to be able to retrieve
// the collected spans.
_, flush, err := texporter.InstallNewPipeline(
_, shutdown, err := texporter.InstallNewPipeline(
[]texporter.Option{texporter.WithProjectID(projectID)},
// For this example code we use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
Expand All @@ -48,12 +48,12 @@ func initTracer() func() {
if err != nil {
log.Fatal(err)
}
return flush
return shutdown
}

func main() {
flush := initTracer()
defer flush()
shutdown := initTracer()
defer shutdown()
tr := otel.Tracer("cloudtrace/example/client")

client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
Expand Down
8 changes: 4 additions & 4 deletions example/trace/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func initTracer() func() {

// Create Google Cloud Trace exporter to be able to retrieve
// the collected spans.
_, flush, err := cloudtrace.InstallNewPipeline(
_, shutdown, err := cloudtrace.InstallNewPipeline(
[]cloudtrace.Option{cloudtrace.WithProjectID(projectID)},
// For this example code we use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
Expand All @@ -42,12 +42,12 @@ func initTracer() func() {
if err != nil {
log.Fatal(err)
}
return flush
return shutdown
}

func main() {
flush := initTracer()
defer flush()
shutdown := initTracer()
defer shutdown()

helloHandler := func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
Expand Down
6 changes: 3 additions & 3 deletions exporter/trace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func main() {
// Create exporter and trace provider pipeline, and register provider.
_, flush, err := texporter.InstallNewPipeline(
_, shutdown, err := texporter.InstallNewPipeline(
[]texporter.Option {
// optional exporter options
},
Expand All @@ -53,7 +53,7 @@ func main() {
log.Fatalf("texporter.InstallNewPipeline: %v", err)
}
// before ending program, wait for all enqueued spans to be exported
defer flush()
defer shutdown()

// Create custom span.
tracer := otel.TraceProvider().Tracer("example.com/trace")
Expand All @@ -79,7 +79,7 @@ When running code locally, you may need to specify a Google Project ID in additi

```go
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
_, flush, err := texporter.InstallNewPipeline(
_, shutdown, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID(projectID),
// other optional exporter options
Expand Down
149 changes: 31 additions & 118 deletions exporter/trace/cloudtrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,41 +67,14 @@ type options struct {
// Optional.
OnError func(err error)

// MonitoringClientOptions are additional options to be passed
// to the underlying Stackdriver Monitoring API client.
// Optional.
MonitoringClientOptions []option.ClientOption

// TraceClientOptions are additional options to be passed
// to the underlying Stackdriver Trace API client.
// Optional.
TraceClientOptions []option.ClientOption

// BundleDelayThreshold determines the max amount of time
// the exporter can wait before uploading view data or trace spans to
// the backend.
// Optional. Default value is 2 seconds.
BundleDelayThreshold time.Duration

// BundleCountThreshold determines how many view data events or trace spans
// can be buffered before batch uploading them to the backend.
// Optional. Default value is 50.
BundleCountThreshold int

// BundleByteThreshold is the number of bytes that can be buffered before
// batch uploading them to the backend.
// Optional. Default value is 15KB.
BundleByteThreshold int

// BundleByteLimit is the maximum size of a bundle, in bytes. Zero means unlimited.
// Optional. Default value is unlimited.
BundleByteLimit int

// BufferMaxBytes is the maximum size (in bytes) of spans that
// will be buffered in memory before being dropped.
//
// If unset, a default of 8MB will be used.
BufferMaxBytes int
// BatchSpanProcessorOptions are additional options to be based
// to the underlying BatchSpanProcessor when call making a new export pipeline.
BatchSpanProcessorOptions []sdktrace.BatchSpanProcessorOption

// DefaultTraceAttributes will be appended to every span that is exported to
// Stackdriver Trace.
Expand All @@ -118,27 +91,13 @@ type options struct {
// If unset, context.Background() will be used.
Context context.Context

// SkipCMD enforces to skip all the CreateMetricDescriptor calls.
// These calls are important in order to configure the unit of the metrics,
// but in some cases all the exported metrics are builtin (unit is configured)
// or the unit is not important.
SkipCMD bool

// Timeout for all API calls. If not set, defaults to 5 seconds.
Timeout time.Duration

// ReportingInterval sets the interval between reporting metrics.
// If it is set to zero then default value is used.
ReportingInterval time.Duration

// DisplayNameFormatter is a function that produces the display name of a span
// given its SpanSnapshot.
// Optional. Default format for SpanSnapshot s is "Span.{s.SpanKind}-{s.Name}"
DisplayNameFormatter

// MaxNumberOfWorkers sets the maximum number of go rountines that send requests
// to Cloud Trace. The minimum number of workers is 1.
MaxNumberOfWorkers int
}

// WithProjectID sets Google Cloud Platform project as projectID.
Expand All @@ -161,60 +120,13 @@ func WithOnError(onError func(err error)) func(o *options) {
}
}

// WithBundleDelayThreshold sets the max amount of time the exporter can wait before
// uploading trace spans to the backend.
func WithBundleDelayThreshold(bundleDelayThreshold time.Duration) func(o *options) {
return func(o *options) {
o.BundleDelayThreshold = bundleDelayThreshold
}
}

// WithBundleCountThreshold sets how many trace spans can be buffered before batch
// uploading them to the backend.
func WithBundleCountThreshold(bundleCountThreshold int) func(o *options) {
return func(o *options) {
o.BundleCountThreshold = bundleCountThreshold
}
}

// WithBundleByteThreshold sets the number of bytes that can be buffered before
// batch uploading them to the backend.
func WithBundleByteThreshold(bundleByteThreshold int) func(o *options) {
return func(o *options) {
o.BundleByteThreshold = bundleByteThreshold
}
}

// WithBundleByteLimit sets the maximum size of a bundle, in bytes. Zero means
// unlimited.
func WithBundleByteLimit(bundleByteLimit int) func(o *options) {
return func(o *options) {
o.BundleByteLimit = bundleByteLimit
}
}

// WithBufferMaxBytes sets the maximum size (in bytes) of spans that will
// be buffered in memory before being dropped
func WithBufferMaxBytes(bufferMaxBytes int) func(o *options) {
return func(o *options) {
o.BufferMaxBytes = bufferMaxBytes
}
}

// WithTraceClientOptions sets additionial client options for tracing.
func WithTraceClientOptions(opts []option.ClientOption) func(o *options) {
return func(o *options) {
o.TraceClientOptions = opts
}
}

// WithMonitoringClientOptions sets additionial client options for monitoring.
func WithMonitoringClientOptions(opts []option.ClientOption) func(o *options) {
return func(o *options) {
o.MonitoringClientOptions = opts
}
}

// WithContext sets the context that trace exporter and metric exporter
// relies on.
func WithContext(ctx context.Context) func(o *options) {
Expand All @@ -223,14 +135,6 @@ func WithContext(ctx context.Context) func(o *options) {
}
}

// WithMaxNumberOfWorkers sets the number of go routines that send requests
// to the Cloud Trace backend.
func WithMaxNumberOfWorkers(n int) func(o *options) {
return func(o *options) {
o.MaxNumberOfWorkers = n
}
}

// WithTimeout sets the timeout for trace exporter and metric exporter
func WithTimeout(t time.Duration) func(o *options) {
return func(o *options) {
Expand Down Expand Up @@ -267,34 +171,44 @@ type Exporter struct {

// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
func InstallNewPipeline(opts []Option, topts ...sdktrace.TracerProviderOption) (trace.TracerProvider, func(), error) {
tp, flush, err := NewExportPipeline(opts, topts...)
tp, shutdown, err := NewExportPipeline(opts, topts...)
if err != nil {
return nil, nil, err
}
otel.SetTracerProvider(tp)
return tp, flush, err
return tp, shutdown, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup
// for trace provider. Returns provider, flush function, and errors.
// for trace provider. Returns provider, shutdown function, and errors.
func NewExportPipeline(opts []Option, topts ...sdktrace.TracerProviderOption) (trace.TracerProvider, func(), error) {
exporter, err := NewExporter(opts...)
// TODO(suereth): Don't flesh options twice.
o := options{Context: context.Background()}
for _, opt := range opts {
opt(&o)
}
exporter, err := newExporterWithOptions(&o)
if err != nil {
return nil, nil, err
}
tp := sdktrace.NewTracerProvider(append(topts, sdktrace.WithSyncer(exporter))...)
return tp, exporter.traceExporter.Flush, nil
tp := sdktrace.NewTracerProvider(
append(topts,
sdktrace.WithBatcher(exporter, o.BatchSpanProcessorOptions...))...)
return tp, func() {
tp.Shutdown(context.Background())
}, nil
}

// NewExporter creates a new Exporter thats implements trace.Exporter.
//
// TODO(yoshifumi): add a metrics exporter one the spec definition
// process and the sampler implementation are done.
func NewExporter(opts ...Option) (*Exporter, error) {
o := options{Context: context.Background()}
for _, opt := range opts {
opt(&o)
}
return newExporterWithOptions(&o)
}

func newExporterWithOptions(o *options) (*Exporter, error) {
if o.ProjectID == "" {
creds, err := google.FindDefaultCredentials(o.Context, traceapi.DefaultAuthScopes()...)
if err != nil {
Expand All @@ -305,7 +219,7 @@ func NewExporter(opts ...Option) (*Exporter, error) {
}
o.ProjectID = creds.ProjectID
}
te, err := newTraceExporter(&o)
te, err := newTraceExporter(o)
if err != nil {
return nil, err
}
Expand All @@ -324,23 +238,22 @@ func newContextWithTimeout(ctx context.Context, timeout time.Duration) (context.

// ExportSpans exports a SpanSnapshot to Stackdriver Trace.
func (e *Exporter) ExportSpans(ctx context.Context, spanData []*export.SpanSnapshot) error {
for _, sd := range spanData {
if len(e.traceExporter.o.DefaultTraceAttributes) > 0 {
sd = e.sdWithDefaultTraceAttributes(sd)
if len(e.traceExporter.o.DefaultTraceAttributes) > 0 {
converted := make([]*export.SpanSnapshot, len(spanData))
for i, sd := range spanData {
converted[i] = e.sdWithDefaultTraceAttributes((sd))
}
e.traceExporter.ExportSpan(ctx, sd)
return e.traceExporter.ExportSpans(ctx, converted)
}

return nil
return e.traceExporter.ExportSpans(ctx, spanData)
}

// Shutdown waits for exported data to be uploaded.
//
// This is useful if your program is ending and you do not
// want to lose recent spans.
// For our purposes it closed down the client.
func (e *Exporter) Shutdown(ctx context.Context) error {
e.traceExporter.Flush()
return nil
return e.traceExporter.Shutdown(ctx)
}

func (e *Exporter) sdWithDefaultTraceAttributes(sd *export.SpanSnapshot) *export.SpanSnapshot {
Expand Down
Loading

0 comments on commit f288a1b

Please sign in to comment.