Skip to content

Commit

Permalink
Added create trace pipeline method (Breaking Change: removes implemen…
Browse files Browse the repository at this point in the history
…tation of "SpanBatcher") (#56)
  • Loading branch information
stevencl1013 committed Jul 8, 2020
1 parent 6a72d80 commit 4bf57be
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 135 deletions.
27 changes: 10 additions & 17 deletions example/trace/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"

"net/http"
"time"

"google.golang.org/grpc/codes"

Expand All @@ -35,30 +34,26 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func initTracer() {
func initTracer() func() {
projectID := os.Getenv("PROJECT_ID")

// Create Google Cloud Trace exporter to be able to retrieve
// the collected spans.
exporter, err := texporter.NewExporter(
texporter.WithProjectID(projectID),
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option {texporter.WithProjectID(projectID)},
// For this example code we use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
}

// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exporter))
if err != nil {
log.Fatal(err)
}
global.SetTraceProvider(tp)
return flush
}

func main() {
initTracer()
flush := initTracer()
defer flush()
tr := global.TraceProvider().Tracer("cloudtrace/example/client")

client := http.DefaultClient
Expand Down Expand Up @@ -92,7 +87,5 @@ func main() {
}

fmt.Printf("Response Received: %s\n\n\n", body)
fmt.Printf("Waiting for few seconds to export spans ...\n\n")
time.Sleep(10 * time.Second)
fmt.Println("Check traces on Google Cloud Trace")
fmt.Printf("Waiting to export spans ...\n\n")
}
22 changes: 9 additions & 13 deletions example/trace/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,26 @@ import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

func initTracer() {
func initTracer() func() {
projectID := os.Getenv("PROJECT_ID")

// Create Google Cloud Trace exporter to be able to retrieve
// the collected spans.
exporter, err := cloudtrace.NewExporter(
cloudtrace.WithProjectID(projectID),
_, flush, err := cloudtrace.InstallNewPipeline(
[]cloudtrace.Option {cloudtrace.WithProjectID(projectID)},
// For this example code we use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
)
if err != nil {
log.Fatal(err)
}

// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exporter))
if err != nil {
log.Fatal(err)
}
global.SetTraceProvider(tp)
return flush
}

func main() {
initTracer()
flush := initTracer()
defer flush()

tr := global.TraceProvider().Tracer("cloudtrace/example/server")

Expand Down
37 changes: 20 additions & 17 deletions exporter/trace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,31 @@ Add `github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace`
import texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
```

Once you import the trace exporter package, then register the exporter to the application, and start tracing. If you are running in a GCP environment, the exporter will automatically authenticate using the environment's service account. If not, you will need to follow the instruction in [Authentication](#Authentication).
Once you import the trace exporter package, create and install a new export pipeline, and then you can start tracing. If you are running in a GCP environment, the exporter will automatically authenticate using the environment's service account. If not, you will need to follow the instruction in [Authentication](#Authentication).

```go
// Create exporter.
// Create exporter and trace provider pipeline, and register provider.
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
exporter, err := texporter.NewExporter(texporter.WithProjectID(projectID))
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID(projectID),
// other optional exporter options
},
// This example code uses sdktrace.AlwaysSample sampler to sample all traces.
// In a production environment or high QPS setup please use ProbabilitySampler
// set at the desired probability.
// Example:
// sdktrace.WithConfig(sdktrace.Config {
// DefaultSampler: sdktrace.ProbabilitySampler(0.0001),
// })
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
// other optional provider options
)
if err != nil {
log.Fatalf("texporter.NewExporter: %v", err)
log.Fatalf("texporter.InstallNewPipeline: %v", err)
}
// Create trace provider with the exporter.
//
// By default it uses AlwaysSample() which samples all traces.
// In a production environment or high QPS setup please use
// ProbabilitySampler set at the desired probability.
// Example:
// config := sdktrace.Config{DefaultSampler:sdktrace.ProbabilitySampler(0.0001)}
// tp, err := sdktrace.NewProvider(sdktrace.WithConfig(config), ...)
tp, err := sdktrace.NewProvider(sdktrace.WithSyncer(exporter))
if err != nil {
log.Fatal(err)
}
global.SetTraceProvider(tp)
// before ending program, wait for all enqueued spans to be exported
defer flush()

// Create custom span.
tracer := global.TraceProvider().Tracer("example.com/trace")
Expand Down
32 changes: 27 additions & 5 deletions exporter/trace/cloudtrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
"golang.org/x/oauth2/google"
"google.golang.org/api/option"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
apitrace "go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// Option is function type that is passed to the exporter initialization function.
Expand Down Expand Up @@ -229,6 +232,30 @@ type Exporter struct {
traceExporter *traceExporter
}

// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
func InstallNewPipeline(opts []Option, topts ...sdktrace.ProviderOption) (apitrace.Provider, func(), error) {
tp, flush, err := NewExportPipeline(opts, topts...)
if err != nil {
return nil, nil, err
}
global.SetTraceProvider(tp)
return tp, flush, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup
// for trace provider. Returns provider, flush function, and errors.
func NewExportPipeline(opts []Option, topts ...sdktrace.ProviderOption) (apitrace.Provider, func(), error) {
exporter, err := NewExporter(opts...)
if err != nil {
return nil, nil, err
}
tp, err := sdktrace.NewProvider(append(topts, sdktrace.WithSyncer(exporter))...)
if err != nil {
return nil, nil, err
}
return tp, exporter.traceExporter.Flush, nil
}

// NewExporter creates a new Exporter thats implements trace.Exporter.
//
// TODO(yoshifumi): add a metrics exporter one the spec definition
Expand Down Expand Up @@ -273,11 +300,6 @@ func (e *Exporter) ExportSpan(ctx context.Context, sd *export.SpanData) {
e.traceExporter.ExportSpan(ctx, sd)
}

// ExportSpans exports a slice of SpanData to Stackdriver Trace in batch
func (e *Exporter) ExportSpans(ctx context.Context, sds []*export.SpanData) {
e.traceExporter.ExportSpans(ctx, sds)
}

func (e *Exporter) sdWithDefaultTraceAttributes(sd *export.SpanData) *export.SpanData {
newSD := *sd
for k, v := range e.traceExporter.o.DefaultTraceAttributes {
Expand Down
120 changes: 49 additions & 71 deletions exporter/trace/cloudtrace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,35 +98,29 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func TestExporter_ExportSpans(t *testing.T) {
func TestExporter_ExportSpan(t *testing.T) {
// Initial test precondition
mockTrace.spansUploaded = nil
mockTrace.delay = 0

// Create Google Cloud Trace Exporter
exp, err := texporter.NewExporter(
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
)
assert.NoError(t, err)

tp, err := sdktrace.NewProvider(
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(exp, // add following two options to ensure flush
sdktrace.WithBatchTimeout(1),
sdktrace.WithMaxExportBatchSize(1),
))
)
assert.NoError(t, err)

global.SetTraceProvider(tp)
_, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span")
span.End()
assert.True(t, span.SpanContext().IsValid())

// wait exporter to flush
time.Sleep(20 * time.Millisecond)
flush()
assert.EqualValues(t, 1, mockTrace.len())
}

Expand All @@ -141,26 +135,23 @@ func TestExporter_DisplayNameFormatter(t *testing.T) {
}

// Create Google Cloud Trace Exporter
exp, err := texporter.NewExporter(
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleCountThreshold(1),
texporter.WithDisplayNameFormatter(format),
)
assert.NoError(t, err)

tp, err := sdktrace.NewProvider(
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleCountThreshold(1),
texporter.WithDisplayNameFormatter(format),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exp))
)
assert.NoError(t, err)

global.SetTraceProvider(tp)
_, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), spanName)
span.End()
assert.True(t, span.SpanContext().IsValid())

// wait exporter to flush
time.Sleep(20 * time.Millisecond)
flush()
assert.EqualValues(t, 1, mockTrace.len())
assert.EqualValues(t, "TEST_FORMAT" + spanName, mockTrace.spansUploaded[0].DisplayName.Value)
}
Expand All @@ -170,34 +161,29 @@ func TestExporter_Timeout(t *testing.T) {
mockTrace.spansUploaded = nil
mockTrace.delay = 20 * time.Millisecond
var exportErrors []error
ch := make(chan error)

// Create Google Cloud Trace Exporter
exp, err := texporter.NewExporter(
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithTimeout(1*time.Millisecond),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
texporter.WithOnError(func(err error) {
exportErrors = append(exportErrors, err)
ch <- err
}),
)
assert.NoError(t, err)

tp, err := sdktrace.NewProvider(
_, flush, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithTimeout(1*time.Millisecond),
// handle bundle as soon as span is received
texporter.WithBundleCountThreshold(1),
texporter.WithOnError(func(err error) {
exportErrors = append(exportErrors, err)
}),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exp))
)
assert.NoError(t, err)

global.SetTraceProvider(tp)
_, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span")
span.End()
assert.True(t, span.SpanContext().IsValid())

// wait for error to be handled
<-ch
flush()
assert.EqualValues(t, 0, mockTrace.len())
if got, want := len(exportErrors), 1; got != want {
t.Fatalf("len(exportErrors) = %q; want %q", got, want)
Expand All @@ -217,21 +203,17 @@ func TestBundling(t *testing.T) {
}
mockTrace.mu.Unlock()

exporter, err := texporter.NewExporter(
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(time.Second / 10),
texporter.WithBundleCountThreshold(10),
)
assert.NoError(t, err)

tp, err := sdktrace.NewProvider(
_, _, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(time.Second / 10),
texporter.WithBundleCountThreshold(10),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exporter))
)
assert.NoError(t, err)

global.SetTraceProvider(tp)

for i := 0; i < 35; i++ {
_, span := global.TraceProvider().Tracer("test-tracer").Start(context.Background(), "test-span")
span.End()
Expand Down Expand Up @@ -281,22 +263,18 @@ func TestBundling_ConcurrentExports(t *testing.T) {
workers := 3
spansPerWorker := 50
delay := 2 * time.Second
exporter, err := texporter.NewExporter(
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(delay),
texporter.WithBundleCountThreshold(spansPerWorker),
texporter.WithMaxNumberOfWorkers(workers),
)
assert.NoError(t, err)

tp, err := sdktrace.NewProvider(
_, _, err := texporter.InstallNewPipeline(
[]texporter.Option {
texporter.WithProjectID("PROJECT_ID_NOT_REAL"),
texporter.WithTraceClientOptions(clientOpt),
texporter.WithBundleDelayThreshold(delay),
texporter.WithBundleCountThreshold(spansPerWorker),
texporter.WithMaxNumberOfWorkers(workers),
},
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithSyncer(exporter))
)
assert.NoError(t, err)

global.SetTraceProvider(tp)

waitCh := make(chan struct{})
wg.Add(workers)

Expand Down
Loading

0 comments on commit 4bf57be

Please sign in to comment.