From a802c52e052a08727280206c9271956f62e66bf4 Mon Sep 17 00:00:00 2001 From: Nicolas Chotard Date: Tue, 7 May 2024 17:34:04 +0200 Subject: [PATCH] Fix OpenTelemetry graceful shutdown. When running OPA with the distributed tracing option enabled, the OpenTelemetry trace exporter is not gracefully shut down when the server is stopped. This PR fixes that issues by moving the trace exporter shutdown in the gracefulServerShutdown function. Fixes: #6651 Signed-off-by: Nicolas Chotard Signed-off-by: David Wobrock --- runtime/runtime.go | 14 +++++++------- runtime/runtime_test.go | 43 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index c955043cf6..899ea74a9c 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -540,13 +540,6 @@ func (rt *Runtime) Serve(ctx context.Context) error { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to start OpenTelemetry trace exporter.") return err } - - defer func() { - err := rt.traceExporter.Shutdown(ctx) - if err != nil { - rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") - } - }() } rt.server = server.New(). @@ -863,6 +856,13 @@ func (rt *Runtime) gracefulServerShutdown(s *server.Server) error { return err } rt.logger.Info("Server shutdown.") + + if rt.traceExporter != nil { + err = rt.traceExporter.Shutdown(ctx) + if err != nil { + rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to shutdown OpenTelemetry trace exporter gracefully.") + } + } return nil } diff --git a/runtime/runtime_test.go b/runtime/runtime_test.go index 6c2179a364..6541d150e7 100644 --- a/runtime/runtime_test.go +++ b/runtime/runtime_test.go @@ -1235,6 +1235,49 @@ func TestServerInitializedWithBundleRegoVersion(t *testing.T) { } } +func TestGracefulTracerShutdown(t *testing.T) { + fs := map[string]string{ + "/config.yaml": `{"distributed_tracing": {"type": "grpc"}}`, + } + + test.WithTempFS(fs, func(testDirRoot string) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond) + defer cancel() // NOTE(sr): The timeout will have been reached by the time `done` is closed. + + logger := logging.New() + stdout := bytes.NewBuffer(nil) + logger.SetOutput(stdout) + logger.SetLevel(logging.Warn) + + params := NewParams() + params.ConfigFile = filepath.Join(testDirRoot, "/config.yaml") + params.Addrs = &[]string{"localhost:0"} + params.GracefulShutdownPeriod = 1 + params.Logger = logger + + rt, err := NewRuntime(ctx, params) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + if rt.traceExporter == nil { + t.Fatal("traceExporter should not be nil") + } + + done := make(chan struct{}) + go func() { + rt.StartServer(ctx) + close(done) + }() + <-done + + expected := "Failed to shutdown OpenTelemetry trace exporter gracefully." + if strings.Contains(stdout.String(), expected) { + t.Fatalf("Expected no output containing: \"%v\"", expected) + } + }) +} + func TestUrlPathToConfigOverride(t *testing.T) { params := NewParams() params.Paths = []string{"https://www.example.com/bundles/bundle.tar.gz"}