From 1b39bd17564a3c6d2acafa03379d1aa49aa4d11f Mon Sep 17 00:00:00 2001 From: Nicolas Chotard Date: Wed, 3 Apr 2024 15:40:24 +0200 Subject: [PATCH] Fix OpenTelemetry graceful shutdown. When running OPA with the distributed tracing option enabled, the OpenTelemetry trace exporter is not gracefully shut down when the server is stopped. This PR fixes that issues by moving the trace exporter shutdown in the gracefulServerShutdown function. Fixes: #6651 Signed-off-by: Nicolas Chotard Signed-off-by: David Wobrock --- runtime/runtime.go | 14 +++++------ runtime/runtime_test.go | 55 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index c955043cf6..899ea74a9c 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -540,13 +540,6 @@ func (rt *Runtime) Serve(ctx context.Context) error { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.") return err } - - defer func() { - err := rt.traceExporter.Shutdown(ctx) - if err != nil { - rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") - } - }() } rt.server = server.New(). @@ -863,6 +856,13 @@ func (rt *Runtime) gracefulServerShutdown(s *server.Server) error { return err } rt.logger.Info("Server shutdown.") + + if rt.traceExporter != nil { + err = rt.traceExporter.Shutdown(ctx) + if err != nil { + rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") + } + } return nil } diff --git a/runtime/runtime_test.go b/runtime/runtime_test.go index 6c2179a364..5f80f94379 100644 --- a/runtime/runtime_test.go +++ b/runtime/runtime_test.go @@ -19,6 +19,11 @@ import ( "testing" "time" + "github.com/open-policy-agent/opa/tracing" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "github.com/open-policy-agent/opa/internal/file/archive" "github.com/open-policy-agent/opa/loader" @@ -1235,6 +1240,56 @@ func TestServerInitializedWithBundleRegoVersion(t *testing.T) { } } +func TestGracefulTracerShutdown(t *testing.T) { + fs := map[string]string{ + "/config.yaml": `{"distributed_tracing": {"type": "grpc"}}`, + } + + test.WithTempFS(fs, func(testDirRoot string) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) + defer cancel() // NOTE(sr): The timeout will have been reached by the time `done` is closed. + + logger := logging.New() + stdout := bytes.NewBuffer(nil) + logger.SetOutput(stdout) + logger.SetLevel(logging.Warn) + + spanExp := tracetest.NewInMemoryExporter() + options := tracing.NewOptions( + otelhttp.WithTracerProvider(trace.NewTracerProvider(trace.WithSpanProcessor(trace.NewSimpleSpanProcessor(spanExp)))), + ) + + params := NewParams() + params.ConfigFile = filepath.Join(testDirRoot, "/config.yaml") + params.Addrs = &[]string{"localhost:0"} + params.GracefulShutdownPeriod = 1 + params.Logger = logger + + params.DistributedTracingOpts = options + + rt, err := NewRuntime(ctx, params) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + if rt.traceExporter == nil { + t.Fatal("traceExporter should not be nil") + } + + done := make(chan struct{}) + go func() { + rt.StartServer(ctx) + close(done) + }() + <-done + + expected := "Failed to shutdown OpenTelemetry trace exporter gracefully." + if strings.Contains(stdout.String(), expected) { + t.Fatalf("Expected no output containing: \"%v\"", expected) + } + }) +} + func TestUrlPathToConfigOverride(t *testing.T) { params := NewParams() params.Paths = []string{"https://www.example.com/bundles/bundle.tar.gz"}