diff --git a/example/trace/http/client/client.go b/example/trace/http/client/client.go index 0a9737cab..1982ba9de 100644 --- a/example/trace/http/client/client.go +++ b/example/trace/http/client/client.go @@ -22,7 +22,6 @@ import ( "os" "net/http" - "time" "google.golang.org/grpc/codes" @@ -35,30 +34,26 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -func initTracer() { +func initTracer() func() { projectID := os.Getenv("PROJECT_ID") // Create Google Cloud Trace exporter to be able to retrieve // the collected spans. - exporter, err := texporter.NewExporter( - texporter.WithProjectID(projectID), + _, flush, err := texporter.InstallNewPipeline( + []texporter.Option {texporter.WithProjectID(projectID)}, + // For this example code we use sdktrace.AlwaysSample sampler to sample all traces. + // In a production application, use sdktrace.ProbabilitySampler with a desired probability. + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), ) if err != nil { log.Fatal(err) } - - // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. - // In a production application, use sdktrace.ProbabilitySampler with a desired probability. - tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exporter)) - if err != nil { - log.Fatal(err) - } - global.SetTraceProvider(tp) + return flush } func main() { - initTracer() + flush := initTracer() + defer flush() tr := global.TraceProvider().Tracer("cloudtrace/example/client") client := http.DefaultClient @@ -92,7 +87,5 @@ func main() { } fmt.Printf("Response Received: %s\n\n\n", body) - fmt.Printf("Waiting for few seconds to export spans ...\n\n") - time.Sleep(10 * time.Second) - fmt.Println("Check traces on Google Cloud Trace") + fmt.Printf("Waiting to export spans ...\n\n") } diff --git a/example/trace/http/server/server.go b/example/trace/http/server/server.go index 1b82387fb..4ac6587c6 100644 --- a/example/trace/http/server/server.go +++ b/example/trace/http/server/server.go @@ -28,30 +28,26 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -func initTracer() { +func initTracer() func() { projectID := os.Getenv("PROJECT_ID") // Create Google Cloud Trace exporter to be able to retrieve // the collected spans. - exporter, err := cloudtrace.NewExporter( - cloudtrace.WithProjectID(projectID), + _, flush, err := cloudtrace.InstallNewPipeline( + []cloudtrace.Option {cloudtrace.WithProjectID(projectID)}, + // For this example code we use sdktrace.AlwaysSample sampler to sample all traces. + // In a production application, use sdktrace.ProbabilitySampler with a desired probability. + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), ) if err != nil { log.Fatal(err) } - - // For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces. - // In a production application, use sdktrace.ProbabilitySampler with a desired probability. - tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exporter)) - if err != nil { - log.Fatal(err) - } - global.SetTraceProvider(tp) + return flush } func main() { - initTracer() + flush := initTracer() + defer flush() tr := global.TraceProvider().Tracer("cloudtrace/example/server") diff --git a/exporter/trace/README.md b/exporter/trace/README.md index 2066bc56a..7712f8a49 100644 --- a/exporter/trace/README.md +++ b/exporter/trace/README.md @@ -21,28 +21,31 @@ Add `github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace` import texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace" ``` -Once you import the trace exporter package, then register the exporter to the application, and start tracing. If you are running in a GCP environment, the exporter will automatically authenticate using the environment's service account. If not, you will need to follow the instruction in [Authentication](#Authentication). +Once you import the trace exporter package, create and install a new export pipeline, and then you can start tracing. If you are running in a GCP environment, the exporter will automatically authenticate using the environment's service account. If not, you will need to follow the instruction in [Authentication](#Authentication). ```go -// Create exporter. +// Create exporter and trace provider pipeline, and register provider. projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") -exporter, err := texporter.NewExporter(texporter.WithProjectID(projectID)) +_, flush, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID(projectID), + // other optional exporter options + }, + // This example code uses sdktrace.AlwaysSample sampler to sample all traces. + // In a production environment or high QPS setup please use ProbabilitySampler + // set at the desired probability. + // Example: + // sdktrace.WithConfig(sdktrace.Config { + // DefaultSampler: sdktrace.ProbabilitySampler(0.0001), + // }) + sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), + // other optional provider options +) if err != nil { - log.Fatalf("texporter.NewExporter: %v", err) + log.Fatalf("texporter.InstallNewPipeline: %v", err) } -// Create trace provider with the exporter. -// -// By default it uses AlwaysSample() which samples all traces. -// In a production environment or high QPS setup please use -// ProbabilitySampler set at the desired probability. -// Example: -// config := sdktrace.Config{DefaultSampler:sdktrace.ProbabilitySampler(0.0001)} -// tp, err := sdktrace.NewProvider(sdktrace.WithConfig(config), ...) -tp, err := sdktrace.NewProvider(sdktrace.WithSyncer(exporter)) -if err != nil { - log.Fatal(err) -} -global.SetTraceProvider(tp) +// before ending program, wait for all enqueued spans to be exported +defer flush() // Create custom span. tracer := global.TraceProvider().Tracer("example.com/trace") diff --git a/exporter/trace/cloudtrace.go b/exporter/trace/cloudtrace.go index 39affa426..c5be9df6c 100644 --- a/exporter/trace/cloudtrace.go +++ b/exporter/trace/cloudtrace.go @@ -25,8 +25,11 @@ import ( "golang.org/x/oauth2/google" "google.golang.org/api/option" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/kv" + apitrace "go.opentelemetry.io/otel/api/trace" export "go.opentelemetry.io/otel/sdk/export/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" ) // Option is function type that is passed to the exporter initialization function. @@ -229,6 +232,30 @@ type Exporter struct { traceExporter *traceExporter } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +func InstallNewPipeline(opts []Option, topts ...sdktrace.ProviderOption) (apitrace.Provider, func(), error) { + tp, flush, err := NewExportPipeline(opts, topts...) + if err != nil { + return nil, nil, err + } + global.SetTraceProvider(tp) + return tp, flush, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup +// for trace provider. Returns provider, flush function, and errors. +func NewExportPipeline(opts []Option, topts ...sdktrace.ProviderOption) (apitrace.Provider, func(), error) { + exporter, err := NewExporter(opts...) + if err != nil { + return nil, nil, err + } + tp, err := sdktrace.NewProvider(append(topts, sdktrace.WithSyncer(exporter))...) + if err != nil { + return nil, nil, err + } + return tp, exporter.traceExporter.Flush, nil +} + // NewExporter creates a new Exporter thats implements trace.Exporter. // // TODO(yoshifumi): add a metrics exporter one the spec definition @@ -273,11 +300,6 @@ func (e *Exporter) ExportSpan(ctx context.Context, sd *export.SpanData) { e.traceExporter.ExportSpan(ctx, sd) } -// ExportSpans exports a slice of SpanData to Stackdriver Trace in batch -func (e *Exporter) ExportSpans(ctx context.Context, sds []*export.SpanData) { - e.traceExporter.ExportSpans(ctx, sds) -} - func (e *Exporter) sdWithDefaultTraceAttributes(sd *export.SpanData) *export.SpanData { newSD := *sd for k, v := range e.traceExporter.o.DefaultTraceAttributes { diff --git a/exporter/trace/cloudtrace_test.go b/exporter/trace/cloudtrace_test.go index a07b64adb..2ef2f56ef 100644 --- a/exporter/trace/cloudtrace_test.go +++ b/exporter/trace/cloudtrace_test.go @@ -98,35 +98,29 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func TestExporter_ExportSpans(t *testing.T) { +func TestExporter_ExportSpan(t *testing.T) { // Initial test precondition mockTrace.spansUploaded = nil mockTrace.delay = 0 // Create Google Cloud Trace Exporter - exp, err := texporter.NewExporter( - texporter.WithProjectID("PROJECT_ID_NOT_REAL"), - texporter.WithTraceClientOptions(clientOpt), - // handle bundle as soon as span is received - texporter.WithBundleCountThreshold(1), - ) - assert.NoError(t, err) - - tp, err := sdktrace.NewProvider( + _, flush, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID("PROJECT_ID_NOT_REAL"), + texporter.WithTraceClientOptions(clientOpt), + // handle bundle as soon as span is received + texporter.WithBundleCountThreshold(1), + }, sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithBatcher(exp, // add following two options to ensure flush - sdktrace.WithBatchTimeout(1), - sdktrace.WithMaxExportBatchSize(1), - )) + ) assert.NoError(t, err) - global.SetTraceProvider(tp) _, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span") span.End() assert.True(t, span.SpanContext().IsValid()) // wait exporter to flush - time.Sleep(20 * time.Millisecond) + flush() assert.EqualValues(t, 1, mockTrace.len()) } @@ -141,26 +135,23 @@ func TestExporter_DisplayNameFormatter(t *testing.T) { } // Create Google Cloud Trace Exporter - exp, err := texporter.NewExporter( - texporter.WithProjectID("PROJECT_ID_NOT_REAL"), - texporter.WithTraceClientOptions(clientOpt), - texporter.WithBundleCountThreshold(1), - texporter.WithDisplayNameFormatter(format), - ) - assert.NoError(t, err) - - tp, err := sdktrace.NewProvider( + _, flush, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID("PROJECT_ID_NOT_REAL"), + texporter.WithTraceClientOptions(clientOpt), + texporter.WithBundleCountThreshold(1), + texporter.WithDisplayNameFormatter(format), + }, sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exp)) + ) assert.NoError(t, err) - global.SetTraceProvider(tp) _, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), spanName) span.End() assert.True(t, span.SpanContext().IsValid()) // wait exporter to flush - time.Sleep(20 * time.Millisecond) + flush() assert.EqualValues(t, 1, mockTrace.len()) assert.EqualValues(t, "TEST_FORMAT" + spanName, mockTrace.spansUploaded[0].DisplayName.Value) } @@ -170,34 +161,29 @@ func TestExporter_Timeout(t *testing.T) { mockTrace.spansUploaded = nil mockTrace.delay = 20 * time.Millisecond var exportErrors []error - ch := make(chan error) // Create Google Cloud Trace Exporter - exp, err := texporter.NewExporter( - texporter.WithProjectID("PROJECT_ID_NOT_REAL"), - texporter.WithTraceClientOptions(clientOpt), - texporter.WithTimeout(1*time.Millisecond), - // handle bundle as soon as span is received - texporter.WithBundleCountThreshold(1), - texporter.WithOnError(func(err error) { - exportErrors = append(exportErrors, err) - ch <- err - }), - ) - assert.NoError(t, err) - - tp, err := sdktrace.NewProvider( + _, flush, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID("PROJECT_ID_NOT_REAL"), + texporter.WithTraceClientOptions(clientOpt), + texporter.WithTimeout(1*time.Millisecond), + // handle bundle as soon as span is received + texporter.WithBundleCountThreshold(1), + texporter.WithOnError(func(err error) { + exportErrors = append(exportErrors, err) + }), + }, sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exp)) + ) assert.NoError(t, err) - global.SetTraceProvider(tp) _, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span") span.End() assert.True(t, span.SpanContext().IsValid()) // wait for error to be handled - <-ch + flush() assert.EqualValues(t, 0, mockTrace.len()) if got, want := len(exportErrors), 1; got != want { t.Fatalf("len(exportErrors) = %q; want %q", got, want) @@ -217,21 +203,17 @@ func TestBundling(t *testing.T) { } mockTrace.mu.Unlock() - exporter, err := texporter.NewExporter( - texporter.WithProjectID("PROJECT_ID_NOT_REAL"), - texporter.WithTraceClientOptions(clientOpt), - texporter.WithBundleDelayThreshold(time.Second / 10), - texporter.WithBundleCountThreshold(10), - ) - assert.NoError(t, err) - - tp, err := sdktrace.NewProvider( + _, _, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID("PROJECT_ID_NOT_REAL"), + texporter.WithTraceClientOptions(clientOpt), + texporter.WithBundleDelayThreshold(time.Second / 10), + texporter.WithBundleCountThreshold(10), + }, sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exporter)) + ) assert.NoError(t, err) - global.SetTraceProvider(tp) - for i := 0; i < 35; i++ { _, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span") span.End() @@ -281,22 +263,18 @@ func TestBundling_ConcurrentExports(t *testing.T) { workers := 3 spansPerWorker := 50 delay := 2 * time.Second - exporter, err := texporter.NewExporter( - texporter.WithProjectID("PROJECT_ID_NOT_REAL"), - texporter.WithTraceClientOptions(clientOpt), - texporter.WithBundleDelayThreshold(delay), - texporter.WithBundleCountThreshold(spansPerWorker), - texporter.WithMaxNumberOfWorkers(workers), - ) - assert.NoError(t, err) - - tp, err := sdktrace.NewProvider( + _, _, err := texporter.InstallNewPipeline( + []texporter.Option { + texporter.WithProjectID("PROJECT_ID_NOT_REAL"), + texporter.WithTraceClientOptions(clientOpt), + texporter.WithBundleDelayThreshold(delay), + texporter.WithBundleCountThreshold(spansPerWorker), + texporter.WithMaxNumberOfWorkers(workers), + }, sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), - sdktrace.WithSyncer(exporter)) + ) assert.NoError(t, err) - global.SetTraceProvider(tp) - waitCh := make(chan struct{}) wg.Add(workers) diff --git a/exporter/trace/trace.go b/exporter/trace/trace.go index 300b2387e..d4f6e48e3 100644 --- a/exporter/trace/trace.go +++ b/exporter/trace/trace.go @@ -119,18 +119,6 @@ func (e *traceExporter) ExportSpan(ctx context.Context, sd *export.SpanData) { e.checkBundlerError(err) } -// ExportSpans exports a slice of SpanData to Stackdriver Trace in batch -func (e *traceExporter) ExportSpans(ctx context.Context, sds []*export.SpanData) { - pbSpans := make([]*tracepb.Span, len(sds)) - var protoSize int = 0 - for i, sd := range sds { - pbSpans[i] = protoFromSpanData(sd, e.projectID, e.o.DisplayNameFormatter) - protoSize += proto.Size(pbSpans[i]) - } - err := e.bundler.Add(&contextAndSpans{ctx, pbSpans}, protoSize) - e.checkBundlerError(err) -} - // uploadSpans sends a set of spans to Stackdriver. func (e *traceExporter) uploadSpans(ctx context.Context, spans []*tracepb.Span) { req := tracepb.BatchWriteSpansRequest{ @@ -161,6 +149,10 @@ func (e *traceExporter) uploadSpans(ctx context.Context, spans []*tracepb.Span) } } +func (e *traceExporter) Flush() { + e.bundler.Flush() +} + // contextAndSpan stores both a context and spans for use with a bundler. type contextAndSpans struct { ctx context.Context