diff --git a/server/embed/config.go b/server/embed/config.go index 8394b2ae1c5..b14c0586069 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -365,6 +365,9 @@ type Config struct { // that exist at the same time. // Can only be used if ExperimentalEnableDistributedTracing is true. ExperimentalDistributedTracingServiceInstanceID string `json:"experimental-distributed-tracing-instance-id"` + // ExperimentalDistributedTracingSamplingRatePerMillion is the number of samples to collect per million spans. + // Defaults to 0. + ExperimentalDistributedTracingSamplingRatePerMillion int `json:"experimental-distributed-tracing-sampling-rate"` // Logger is logger options: currently only supports "zap". // "capnslog" is removed in v3.5. @@ -758,6 +761,13 @@ func (cfg *Config) Validate() error { return fmt.Errorf("cipher suites cannot be configured when only TLS1.3 is enabled") } + // Validate distributed tracing configuration but only if enabled. + if cfg.ExperimentalEnableDistributedTracing { + if err := validateTracingConfig(cfg.ExperimentalDistributedTracingSamplingRatePerMillion); err != nil { + return fmt.Errorf("distributed tracing configurition is not valid: (%v)", err) + } + } + return nil } diff --git a/server/embed/config_tracing.go b/server/embed/config_tracing.go index 44c2b066bac..880479e515b 100644 --- a/server/embed/config_tracing.go +++ b/server/embed/config_tracing.go @@ -16,6 +16,7 @@ package embed import ( "context" + "fmt" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" @@ -26,13 +27,32 @@ import ( "go.uber.org/zap" ) -func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) { - exporter, err = otlptracegrpc.New(ctx, +const maxSamplingRatePerMillion = 1000000 + +func validateTracingConfig(samplingRate int) error { + if samplingRate < 0 { + return fmt.Errorf("tracing sampling rate must be positive") + } + if samplingRate > maxSamplingRatePerMillion { + return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion) + } + + return nil +} + +type tracingExporter struct { + exporter tracesdk.SpanExporter + opts []otelgrpc.Option + provider *tracesdk.TracerProvider +} + +func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) { + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress), ) if err != nil { - return nil, nil, err + return nil, err } res, err := resource.New(ctx, @@ -41,7 +61,7 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S ), ) if err != nil { - return nil, nil, err + return nil, err } if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil { @@ -49,11 +69,19 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S // resource in case of duplicates. res, err = resource.Merge(res, resWithIDKey) if err != nil { - return nil, nil, err + return nil, err } } - options = append(options, + traceProvider := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exporter), + tracesdk.WithResource(res), + tracesdk.WithSampler( + tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)), + ), + ) + + options := []otelgrpc.Option{ otelgrpc.WithPropagators( propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, @@ -61,13 +89,9 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S ), ), otelgrpc.WithTracerProvider( - tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exporter), - tracesdk.WithResource(res), - tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.NeverSample())), - ), + traceProvider, ), - ) + } cfg.logger.Debug( "distributed tracing enabled", @@ -76,7 +100,29 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID), ) - return exporter, options, err + return &tracingExporter{ + exporter: exporter, + opts: options, + provider: traceProvider, + }, nil +} + +func (te *tracingExporter) Close(ctx context.Context) { + if te.provider != nil { + te.provider.Shutdown(ctx) + } + + if te.exporter != nil { + te.exporter.Shutdown(ctx) + } +} + +func determineSampler(samplingRate int) tracesdk.Sampler { + sampler := tracesdk.NeverSample() + if samplingRate == 0 { + return sampler + } + return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion)) } // As Tracing service Instance ID must be unique, it should diff --git a/server/embed/config_tracing_test.go b/server/embed/config_tracing_test.go new file mode 100644 index 00000000000..0abbe4d1d42 --- /dev/null +++ b/server/embed/config_tracing_test.go @@ -0,0 +1,83 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package embed + +import ( + "testing" +) + +const neverSampleDescription = "AlwaysOffSampler" + +func TestDetermineSampler(t *testing.T) { + tests := []struct { + name string + sampleRate int + wantSamplerDescription string + }{ + { + name: "sample rate is disabled", + sampleRate: 0, + wantSamplerDescription: neverSampleDescription, + }, + { + name: "sample rate is 100", + sampleRate: 100, + wantSamplerDescription: "TraceIDRatioBased{0.0001}", + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sampler := determineSampler(tc.sampleRate) + if tc.wantSamplerDescription != sampler.Description() { + t.Errorf("tracing sampler was not as expected; expected sampler: %#+v, got sampler: %#+v", tc.wantSamplerDescription, sampler.Description()) + } + }) + } +} + +func TestTracingConfig(t *testing.T) { + tests := []struct { + name string + sampleRate int + wantErr bool + }{ + { + name: "invalid - sample rate is less than 0", + sampleRate: -1, + wantErr: true, + }, + { + name: "invalid - sample rate is more than allowed value", + sampleRate: maxSamplingRatePerMillion + 1, + wantErr: true, + }, + { + name: "valid - sample rate is 100", + sampleRate: 100, + wantErr: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := validateTracingConfig(tc.sampleRate) + if err == nil && tc.wantErr { + t.Errorf("expected error got (%v) error", err) + } + if err != nil && !tc.wantErr { + t.Errorf("expected no errors, got error: (%v)", err) + } + }) + } +} diff --git a/server/embed/etcd.go b/server/embed/etcd.go index b4f4defe54e..67764968fbf 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -228,15 +228,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if srvcfg.ExperimentalEnableDistributedTracing { tctx := context.Background() - tracingExporter, opts, err := setupTracingExporter(tctx, cfg) + tracingExporter, err := newTracingExporter(tctx, cfg) if err != nil { return e, err } - if tracingExporter == nil || len(opts) == 0 { - return e, fmt.Errorf("error setting up distributed tracing") + e.tracingExporterShutdown = func() { + tracingExporter.Close(tctx) } - e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) } - srvcfg.ExperimentalTracerOptions = opts + srvcfg.ExperimentalTracerOptions = tracingExporter.opts e.cfg.logger.Info("distributed tracing setup enabled") } diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 5671484ab47..7988f1753da 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -276,6 +276,7 @@ func newConfig() *config { fs.StringVar(&cfg.ec.ExperimentalDistributedTracingAddress, "experimental-distributed-tracing-address", embed.ExperimentalDistributedTracingAddress, "Address for distributed tracing used for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).") fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceName, "experimental-distributed-tracing-service-name", embed.ExperimentalDistributedTracingServiceName, "Configures service name for distributed tracing to be used to define service name for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). 'etcd' is the default service name. Use the same service name for all instances of etcd.") fs.StringVar(&cfg.ec.ExperimentalDistributedTracingServiceInstanceID, "experimental-distributed-tracing-instance-id", "", "Configures service instance ID for distributed tracing to be used to define service instance ID key for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag). There is no default value set. This ID must be unique per etcd instance.") + fs.IntVar(&cfg.ec.ExperimentalDistributedTracingSamplingRatePerMillion, "experimental-distributed-tracing-sampling-rate", 0, "Number of samples to collect per million spans for OpenTelemetry Tracing (if enabled with experimental-enable-distributed-tracing flag).") // auth fs.StringVar(&cfg.ec.AuthToken, "auth-token", cfg.ec.AuthToken, "Specify auth token specific options.") diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 806e82b002e..054b097a650 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -219,6 +219,8 @@ Experimental distributed tracing: Distributed tracing service name, must be same across all etcd instances. --experimental-distributed-tracing-instance-id '' Distributed tracing instance ID, must be unique per each etcd instance. + --experimental-distributed-tracing-sampling-rate '0' + Number of samples to collect per million spans for distributed tracing. Disabled by default. v2 Proxy (to be deprecated in v3.6): --proxy 'off' diff --git a/tests/go.mod b/tests/go.mod index daa6b429a7b..f21dea54a25 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -36,6 +36,11 @@ require ( go.etcd.io/etcd/pkg/v3 v3.5.10 go.etcd.io/etcd/raft/v3 v3.5.10 go.etcd.io/etcd/server/v3 v3.5.10 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 + go.opentelemetry.io/otel v1.20.0 + go.opentelemetry.io/otel/sdk v1.20.0 + go.opentelemetry.io/otel/trace v1.20.0 + go.opentelemetry.io/proto/otlp v1.0.0 go.uber.org/zap v1.17.0 golang.org/x/crypto v0.14.0 golang.org/x/sync v0.3.0 @@ -72,14 +77,9 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.8 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect - go.opentelemetry.io/otel v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect go.opentelemetry.io/otel/metric v1.20.0 // indirect - go.opentelemetry.io/otel/sdk v1.20.0 // indirect - go.opentelemetry.io/otel/trace v1.20.0 // indirect - go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/tests/integration/tracing_test.go b/tests/integration/tracing_test.go new file mode 100644 index 00000000000..934b8bf347c --- /dev/null +++ b/tests/integration/tracing_test.go @@ -0,0 +1,149 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/grpc" + + "go.etcd.io/etcd/client/pkg/v3/testutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" +) + +// TestTracing ensures that distributed tracing is setup when the feature flag is enabled. +func TestTracing(t *testing.T) { + BeforeTest(t) + testutil.SkipTestIfShortMode(t, + "Wal creation tests are depending on embedded etcd server so are integration-level tests.") + // set up trace collector + listener, err := net.Listen("tcp", "localhost:") + if err != nil { + t.Fatal(err) + } + + traceFound := make(chan struct{}) + defer close(traceFound) + + srv := grpc.NewServer() + traceservice.RegisterTraceServiceServer(srv, &traceServer{ + traceFound: traceFound, + filterFunc: containsNodeListSpan}) + + go srv.Serve(listener) + defer srv.Stop() + + cfg := NewEmbedConfig(t, "default") + cfg.ExperimentalEnableDistributedTracing = true + cfg.ExperimentalDistributedTracingAddress = listener.Addr().String() + cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing" + cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100 + + // start an etcd instance with tracing enabled + etcdSrv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer etcdSrv.Close() + + select { + case <-etcdSrv.Server.ReadyNotify(): + case <-time.After(5 * time.Second): + // default randomized election timeout is 1 to 2s, single node will fast-forward 900ms + // change the timeout from 1 to 5 seconds to ensure de-flaking this test + t.Fatalf("failed to start embed.Etcd for test") + } + + // create a client that has tracing enabled + tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + defer tracer.Shutdown(context.TODO()) + tp := trace.TracerProvider(tracer) + + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithTracerProvider(tp), + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )), + } + + dialOptions := []grpc.DialOption{ + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))} + ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.AdvertiseClientUrls[0].String()}} + cli, err := NewClient(t, ccfg) + if err != nil { + etcdSrv.Close() + t.Fatal(err) + } + defer cli.Close() + + // make a request with the instrumented client + resp, err := cli.Get(context.TODO(), "key") + require.NoError(t, err) + require.Empty(t, resp.Kvs) + + // Wait for a span to be recorded from our request + select { + case <-traceFound: + return + case <-time.After(30 * time.Second): + t.Fatal("Timed out waiting for trace") + } +} + +func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool { + for _, resourceSpans := range req.GetResourceSpans() { + for _, attr := range resourceSpans.GetResource().GetAttributes() { + if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" { + continue + } + for _, scoped := range resourceSpans.GetScopeSpans() { + for _, span := range scoped.GetSpans() { + if span.GetName() == "etcdserverpb.KV/Range" { + return true + } + } + } + } + } + return false +} + +// traceServer implements TracesServiceServer +type traceServer struct { + traceFound chan struct{} + filterFunc func(req *traceservice.ExportTraceServiceRequest) bool + traceservice.UnimplementedTraceServiceServer +} + +func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) { + var emptyValue = traceservice.ExportTraceServiceResponse{} + if t.filterFunc(req) { + t.traceFound <- struct{}{} + } + return &emptyValue, nil +}