From 81aa09cd882c7de53d6108a1e140938bf419d891 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Thu, 22 Jul 2021 17:55:34 +0200 Subject: [PATCH 1/9] call WaitGroup.Done() at the right time Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- pkg/controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8e9d425c03b..1ea4054402f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -107,12 +107,12 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) error { var wg sync.WaitGroup for i := 0; i < workers; i++ { - wg.Add(1) // TODO (@munnerz): make time.Second duration configurable - go wait.Until(func() { + wg.Add(1) + go func() { defer wg.Done() - c.worker(ctx) - }, time.Second, stopCh) + wait.Until(func() { c.worker(ctx) }, time.Second, stopCh) + }() } for _, f := range c.runFirstFuncs { From 769e05766316eae0211f219410114d63476298b5 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Thu, 22 Jul 2021 20:38:22 +0200 Subject: [PATCH 2/9] metrics server simplify Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- pkg/metrics/metrics.go | 36 ++----------------- .../certificates/metrics_controller_test.go | 14 ++++++-- 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 2d6077412cc..e4021c2281f 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -24,12 +24,10 @@ limitations under the License. package metrics import ( - "context" "net" "net/http" "time" - logf "github.com/jetstack/cert-manager/pkg/logs" "k8s.io/utils/clock" "github.com/go-logr/logr" @@ -149,7 +147,7 @@ func New(log logr.Logger, c clock.Clock) *Metrics { } // Start will register the Prometheus metrics, and start the Prometheus server -func (m *Metrics) Start(listenAddress string, enablePprof bool) (*http.Server, error) { +func (m *Metrics) NewServer(ln net.Listener, enablePprof bool) *http.Server { m.registry.MustRegister(m.clockTimeSeconds) m.registry.MustRegister(m.certificateExpiryTimeSeconds) m.registry.MustRegister(m.certificateReadyStatus) @@ -163,11 +161,6 @@ func (m *Metrics) Start(listenAddress string, enablePprof bool) (*http.Server, e profiling.Install(mux) } - ln, err := net.Listen("tcp", listenAddress) - if err != nil { - return nil, err - } - server := &http.Server{ Addr: ln.Addr().String(), ReadTimeout: prometheusMetricsServerReadTimeout, @@ -175,35 +168,10 @@ func (m *Metrics) Start(listenAddress string, enablePprof bool) (*http.Server, e MaxHeaderBytes: prometheusMetricsServerMaxHeaderBytes, Handler: mux, } - - go func() { - log := m.log.WithValues("address", ln.Addr()) - log.V(logf.InfoLevel).Info("listening for connections on") - - if err := server.Serve(ln); err != nil { - log.Error(err, "error running prometheus metrics server") - return - } - }() - - return server, nil + return server } // IncrementSyncCallCount will increase the sync counter for that controller. func (m *Metrics) IncrementSyncCallCount(controllerName string) { m.controllerSyncCallCount.WithLabelValues(controllerName).Inc() } - -func (m *Metrics) Shutdown(server *http.Server) { - m.log.V(logf.InfoLevel).Info("stopping Prometheus metrics server...") - - ctx, cancel := context.WithTimeout(context.Background(), prometheusMetricsServerShutdownTimeout) - defer cancel() - - if err := server.Shutdown(ctx); err != nil { - m.log.Error(err, "prometheus metrics server shutdown failed", err) - return - } - - m.log.V(logf.InfoLevel).Info("prometheus metrics server gracefully stopped") -} diff --git a/test/integration/certificates/metrics_controller_test.go b/test/integration/certificates/metrics_controller_test.go index 94a0cc07ee3..2e2dc1328a7 100644 --- a/test/integration/certificates/metrics_controller_test.go +++ b/test/integration/certificates/metrics_controller_test.go @@ -59,11 +59,21 @@ func TestMetricsController(t *testing.T) { kubernetesCl, factory, cmClient, cmFactory := framework.NewClients(t, config) metricsHandler := metrics.New(logf.Log, fixedClock) - server, err := metricsHandler.Start("127.0.0.1:0", false) + + ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } - defer metricsHandler.Shutdown(server) + server := ctx.Metrics.NewServer(ln, false) + + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + return err + } + }() ctrl, queue, mustSync := controllermetrics.NewController(factory, cmFactory, metricsHandler) c := controllerpkg.NewController( From f11c44dd15fed6cc2677d57b12258b651a77a93e Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Thu, 22 Jul 2021 20:38:59 +0200 Subject: [PATCH 3/9] acme solver wait for server to shutdown Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/acmesolver/app/app.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/acmesolver/app/app.go b/cmd/acmesolver/app/app.go index 641aa644dbe..7b4954daa1e 100644 --- a/cmd/acmesolver/app/app.go +++ b/cmd/acmesolver/app/app.go @@ -18,6 +18,7 @@ package app import ( "context" + "time" "github.com/spf13/cobra" @@ -37,9 +38,15 @@ func NewACMESolverCommand(stopCh <-chan struct{}) *cobra.Command { rootCtx = logf.NewContext(rootCtx, nil, "acmesolver") log := logf.FromContext(rootCtx) + completedCh := make(chan struct{}) go func() { + defer close(completedCh) <-stopCh - if err := s.Shutdown(rootCtx); err != nil { + // allow a timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := s.Shutdown(ctx); err != nil { log.Error(err, "error shutting down acmesolver server") } }() @@ -48,6 +55,8 @@ func NewACMESolverCommand(stopCh <-chan struct{}) *cobra.Command { return err } + <-completedCh + return nil }, } From 81e216eeba6cd74eab1c842b97dd35bb7a2ef3e1 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Fri, 23 Jul 2021 00:56:38 +0200 Subject: [PATCH 4/9] wait for goroutines to end before exiting Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/controller/app/BUILD.bazel | 1 + cmd/controller/app/controller.go | 66 ++++--- pkg/controller/controller.go | 2 +- pkg/webhook/authority/authority.go | 1 + pkg/webhook/server/BUILD.bazel | 2 + pkg/webhook/server/server.go | 163 +++++++----------- pkg/webhook/server/tls/dynamic_source.go | 69 +++++--- .../certificates/metrics_controller_test.go | 11 +- 8 files changed, 164 insertions(+), 151 deletions(-) diff --git a/cmd/controller/app/BUILD.bazel b/cmd/controller/app/BUILD.bazel index f4239ab878a..25c6d55a2a4 100644 --- a/cmd/controller/app/BUILD.bazel +++ b/cmd/controller/app/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "@io_k8s_sigs_gateway_api//pkg/client/clientset/versioned/scheme:go_default_library", "@io_k8s_sigs_gateway_api//pkg/client/informers/externalversions:go_default_library", "@io_k8s_utils//clock:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 548c2439cc3..9d7c363b7bc 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -19,10 +19,12 @@ package app import ( "context" "fmt" + "net" + "net/http" "os" - "sync" "time" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -65,10 +67,11 @@ const resyncPeriod = 10 * time.Hour func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh) + g, rootCtx := errgroup.WithContext(rootCtx) rootCtx = logf.NewContext(rootCtx, nil, "controller") log := logf.FromContext(rootCtx) - ctx, kubeCfg, err := buildControllerContext(rootCtx, stopCh, opts) + ctx, kubeCfg, err := buildControllerContext(rootCtx, opts) if err != nil { log.Error(err, "error building controller context", "options", opts) os.Exit(1) @@ -77,13 +80,32 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { enabledControllers := opts.EnabledControllers() log.Info(fmt.Sprintf("enabled controllers: %s", enabledControllers.List())) - metricsServer, err := ctx.Metrics.Start(opts.MetricsListenAddress, opts.EnablePprof) + ln, err := net.Listen("tcp", opts.MetricsListenAddress) if err != nil { log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress) os.Exit(1) } + server := ctx.Metrics.NewServer(ln, opts.EnablePprof) + + g.Go(func() error { + <-rootCtx.Done() + // allow a timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + return err + } + return nil + }) + g.Go(func() error { + log.WithValues("address", ln.Addr()).V(logf.InfoLevel).Info("listening for connections on") + if err := server.Serve(ln); err != http.ErrServerClosed { + return err + } + return nil + }) - var wg sync.WaitGroup run := func(_ context.Context) { for n, fn := range controller.Known() { log := log.WithValues("controller", n) @@ -100,33 +122,33 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { continue } - wg.Add(1) iface, err := fn(ctx) if err != nil { log.Error(err, "error starting controller") os.Exit(1) } - go func(n string, fn controller.Interface) { - defer wg.Done() - log.V(logf.InfoLevel).Info("starting controller") - workers := 5 - err := fn.Run(workers, stopCh) + g.Go(func() error { + log.V(logf.InfoLevel).Info("starting controller", n) - if err != nil { - log.Error(err, "error starting controller") - os.Exit(1) - } - }(n, iface) + workers := 5 + return iface.Run(workers, rootCtx.Done()) + }) } log.V(logf.DebugLevel).Info("starting shared informer factories") - ctx.SharedInformerFactory.Start(stopCh) - ctx.KubeSharedInformerFactory.Start(stopCh) - ctx.GWShared.Start(stopCh) - wg.Wait() + // TODO: we should wait for these informers to finish + ctx.SharedInformerFactory.Start(rootCtx.Done()) + ctx.KubeSharedInformerFactory.Start(rootCtx.Done()) + ctx.GWShared.Start(rootCtx.Done()) + + err := g.Wait() + if err != nil { + log.Error(err, "error starting controller") + os.Exit(1) + } log.V(logf.InfoLevel).Info("control loops exited") - ctx.Metrics.Shutdown(metricsServer) + os.Exit(0) } @@ -145,7 +167,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, run) } -func buildControllerContext(ctx context.Context, stopCh <-chan struct{}, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) { +func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) { log := logf.FromContext(ctx, "build-context") // Load the users Kubernetes config kubeCfg, err := clientcmd.BuildConfigFromFlags(opts.APIServerHost, opts.Kubeconfig) @@ -238,7 +260,7 @@ func buildControllerContext(ctx context.Context, stopCh <-chan struct{}, opts *o return &controller.Context{ RootContext: ctx, - StopCh: stopCh, + StopCh: ctx.Done(), RESTConfig: kubeCfg, Client: cl, CMClient: intcl, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1ea4054402f..d52ab705e58 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -107,10 +107,10 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) error { var wg sync.WaitGroup for i := 0; i < workers; i++ { - // TODO (@munnerz): make time.Second duration configurable wg.Add(1) go func() { defer wg.Done() + // TODO (@munnerz): make time.Second duration configurable wait.Until(func() { c.worker(ctx) }, time.Second, stopCh) }() } diff --git a/pkg/webhook/authority/authority.go b/pkg/webhook/authority/authority.go index 36ddadae0e3..edd954468d3 100644 --- a/pkg/webhook/authority/authority.go +++ b/pkg/webhook/authority/authority.go @@ -210,6 +210,7 @@ func (d *DynamicAuthority) WatchRotation(stopCh <-chan struct{}) <-chan struct{} ch := make(chan struct{}, 1) d.watches = append(d.watches, ch) go func() { + defer close(ch) <-stopCh d.watchMutex.Lock() defer d.watchMutex.Unlock() diff --git a/pkg/webhook/server/BUILD.bazel b/pkg/webhook/server/BUILD.bazel index 8467ed36308..6279c17cb0e 100644 --- a/pkg/webhook/server/BUILD.bazel +++ b/pkg/webhook/server/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/logs:go_default_library", + "//pkg/util:go_default_library", "//pkg/util/profiling:go_default_library", "//pkg/webhook/handlers:go_default_library", "//pkg/webhook/server/tls:go_default_library", @@ -24,6 +25,7 @@ go_library( "@io_k8s_apimachinery//pkg/util/runtime:go_default_library", "@io_k8s_component_base//cli/flag:go_default_library", "@io_k8s_sigs_controller_runtime//pkg/log:go_default_library", + "@org_golang_x_sync//errgroup:go_default_library", ], ) diff --git a/pkg/webhook/server/server.go b/pkg/webhook/server/server.go index 6c46c2d3e9c..2d1dc732708 100644 --- a/pkg/webhook/server/server.go +++ b/pkg/webhook/server/server.go @@ -27,6 +27,7 @@ import ( "time" "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" admissionv1 "k8s.io/api/admission/v1" admissionv1beta1 "k8s.io/api/admission/v1beta1" apiextensionsinstall "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/install" @@ -41,10 +42,11 @@ import ( crlog "sigs.k8s.io/controller-runtime/pkg/log" logf "github.com/jetstack/cert-manager/pkg/logs" + "github.com/jetstack/cert-manager/pkg/util" "github.com/jetstack/cert-manager/pkg/util/profiling" "github.com/jetstack/cert-manager/pkg/webhook/handlers" servertls "github.com/jetstack/cert-manager/pkg/webhook/server/tls" - "github.com/jetstack/cert-manager/pkg/webhook/server/util" + webhookutil "github.com/jetstack/cert-manager/pkg/webhook/server/util" ) var ( @@ -125,21 +127,12 @@ func (s *Server) Run(stopCh <-chan struct{}) error { s.Log = crlog.NullLogger{} } - internalStopCh := make(chan struct{}) - // only close the internalStopCh if it hasn't already been closed - shutdown := false - defer func() { - if !shutdown { - close(internalStopCh) - } - }() - - var healthzChan <-chan error - var certSourceChan <-chan error + gctx := util.ContextWithStopCh(context.Background(), stopCh) + g, gctx := errgroup.WithContext(gctx) // if a HealthzAddr is provided, start the healthz listener if s.HealthzAddr != "" { - l, err := net.Listen("tcp", s.HealthzAddr) + healthzListener, err := net.Listen("tcp", s.HealthzAddr) if err != nil { return err } @@ -148,20 +141,43 @@ func (s *Server) Run(stopCh <-chan struct{}) error { mux.HandleFunc("/healthz", s.handleHealthz) mux.HandleFunc("/livez", s.handleLivez) s.Log.V(logf.InfoLevel).Info("listening for insecure healthz connections", "address", s.HealthzAddr) - healthzChan = s.startServer(l, internalStopCh, mux) + server := &http.Server{ + Handler: mux, + } + g.Go(func() error { + <-gctx.Done() + // allow a timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := server.Shutdown(ctx); err != nil { + return err + } + return nil + }) + g.Go(func() error { + if err := server.Serve(healthzListener); err != http.ErrServerClosed { + return err + } + return nil + }) } // create a listener for actual webhook requests - l, err := net.Listen("tcp", s.ListenAddr) + listerner, err := net.Listen("tcp", s.ListenAddr) if err != nil { return err } - s.listener = l // wrap the listener with TLS if a CertificateSource is provided if s.CertificateSource != nil { s.Log.V(logf.InfoLevel).Info("listening for secure connections", "address", s.ListenAddr) - certSourceChan = s.startCertificateSource(internalStopCh) + g.Go(func() error { + if err := s.CertificateSource.Run(gctx.Done()); (err != nil) && !errors.Is(err, context.Canceled) { + return err + } + return nil + }) cipherSuites, err := ciphers.TLSCipherSuites(s.CipherSuites) if err != nil { return err @@ -170,7 +186,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { if err != nil { return err } - l = tls.NewListener(l, &tls.Config{ + listerner = tls.NewListener(listerner, &tls.Config{ GetCertificate: s.CertificateSource.GetCertificate, CipherSuites: cipherSuites, MinVersion: minVersion, @@ -180,6 +196,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { s.Log.V(logf.InfoLevel).Info("listening for insecure connections", "address", s.ListenAddr) } + s.listener = listerner mux := http.NewServeMux() mux.HandleFunc("/validate", s.handle(s.validate)) mux.HandleFunc("/mutate", s.handle(s.mutate)) @@ -188,31 +205,28 @@ func (s *Server) Run(stopCh <-chan struct{}) error { profiling.Install(mux) s.Log.V(logf.InfoLevel).Info("registered pprof handlers") } - listenerChan := s.startServer(l, internalStopCh, mux) - - if certSourceChan == nil { - certSourceChan = blockingChan(internalStopCh) - } - if healthzChan == nil { - healthzChan = blockingChan(internalStopCh) - } - - select { - case err = <-healthzChan: - case err = <-certSourceChan: - case err = <-listenerChan: - case <-stopCh: + server := &http.Server{ + Handler: mux, } + g.Go(func() error { + <-gctx.Done() + // allow a timeout for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - close(internalStopCh) - shutdown = true - - s.Log.V(logf.DebugLevel).Info("waiting for server to shutdown") - waitForAll(healthzChan, certSourceChan, listenerChan) - - s.Log.V(logf.InfoLevel).Info("server shutdown successfully") + if err := server.Shutdown(ctx); err != nil { + return err + } + return nil + }) + g.Go(func() error { + if err := server.Serve(s.listener); err != http.ErrServerClosed { + return err + } + return nil + }) - return err + return g.Wait() } // Port returns the port number that the webhook listener is listening on @@ -227,67 +241,6 @@ func (s *Server) Port() (int, error) { return tcpAddr.Port, nil } -func (s *Server) startServer(l net.Listener, stopCh <-chan struct{}, handle http.Handler) <-chan error { - ch := make(chan error) - go func() { - defer close(ch) - - srv := &http.Server{ - Handler: handle, - } - select { - case err := <-channelWrapper(func() error { return srv.Serve(l) }): - ch <- err - case <-stopCh: - // allow a fixed 5s for graceful shutdown - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - if err := srv.Shutdown(ctx); err != nil { - s.Log.Error(err, "failed to gracefully shutdown http server") - ch <- err - } - s.Log.V(logf.DebugLevel).Info("shutdown HTTP server gracefully") - } - }() - return ch -} - -func (s *Server) startCertificateSource(stopCh <-chan struct{}) <-chan error { - fn := func() error { - return s.CertificateSource.Run(stopCh) - } - return channelWrapper(fn) -} - -func waitForAll(chs ...<-chan error) error { - for _, ch := range chs { - if err := <-ch; err != nil { - return fmt.Errorf("error waiting for goroutine to exit: %w", err) - } - } - return nil -} - -func channelWrapper(fn func() error) <-chan error { - ch := make(chan error) - go func() { - defer close(ch) - ch <- fn() - }() - return ch -} - -// blockingChan returns a 'no-op' error channel. -// When stopCh is closed, the error channel will also be closed. -func blockingChan(stopCh <-chan struct{}) <-chan error { - ch := make(chan error) - go func() { - defer close(ch) - <-stopCh - }() - return ch -} - func (s *Server) scheme() *runtime.Scheme { if s.Scheme == nil { return defaultScheme @@ -305,7 +258,7 @@ func (s *Server) validate(ctx context.Context, obj runtime.Object) (runtime.Obje return nil, errors.New("request is not of type apiextensions v1 or v1beta1") } review = &admissionv1.AdmissionReview{} - util.Convert_v1beta1_AdmissionReview_To_admission_AdmissionReview(reviewv1beta1, review) + webhookutil.Convert_v1beta1_AdmissionReview_To_admission_AdmissionReview(reviewv1beta1, review) } resp := s.ValidationWebhook.Validate(ctx, review.Request) review.Response = resp @@ -317,7 +270,7 @@ func (s *Server) validate(ctx context.Context, obj runtime.Object) (runtime.Obje // reply v1beta1 reviewv1beta1 := &admissionv1beta1.AdmissionReview{} - util.Convert_admission_AdmissionReview_To_v1beta1_AdmissionReview(review, reviewv1beta1) + webhookutil.Convert_admission_AdmissionReview_To_v1beta1_AdmissionReview(review, reviewv1beta1) return reviewv1beta1, nil } @@ -331,7 +284,7 @@ func (s *Server) mutate(ctx context.Context, obj runtime.Object) (runtime.Object return nil, errors.New("request is not of type apiextensions v1 or v1beta1") } review = &admissionv1.AdmissionReview{} - util.Convert_v1beta1_AdmissionReview_To_admission_AdmissionReview(reviewv1beta1, review) + webhookutil.Convert_v1beta1_AdmissionReview_To_admission_AdmissionReview(reviewv1beta1, review) } resp := s.MutationWebhook.Mutate(ctx, review.Request) review.Response = resp @@ -343,7 +296,7 @@ func (s *Server) mutate(ctx context.Context, obj runtime.Object) (runtime.Object // reply v1beta1 reviewv1beta1 := &admissionv1beta1.AdmissionReview{} - util.Convert_admission_AdmissionReview_To_v1beta1_AdmissionReview(review, reviewv1beta1) + webhookutil.Convert_admission_AdmissionReview_To_v1beta1_AdmissionReview(review, reviewv1beta1) return reviewv1beta1, nil } diff --git a/pkg/webhook/server/tls/dynamic_source.go b/pkg/webhook/server/tls/dynamic_source.go index 47cdf8d1e89..1b25b56ef8c 100644 --- a/pkg/webhook/server/tls/dynamic_source.go +++ b/pkg/webhook/server/tls/dynamic_source.go @@ -17,6 +17,7 @@ limitations under the License. package tls import ( + "context" "crypto" "crypto/tls" "crypto/x509" @@ -49,7 +50,6 @@ type DynamicSource struct { Log logr.Logger cachedCertificate *tls.Certificate - nextRenew time.Time lock sync.Mutex } @@ -67,6 +67,8 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { authorityErrChan <- f.Authority.Run(stopCh) }() + nextRenewCh := make(chan time.Time, 1) + // initially fetch a certificate from the signing CA interval := time.Second if err := wait.PollUntil(interval, func() (done bool, err error) { @@ -78,18 +80,21 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { return true, fmt.Errorf("failed to run certificate authority: %w", err) } if !ok { - return true, fmt.Errorf("certificate authority stopped") + return true, context.Canceled } default: // this case avoids blocking if the authority is still running } - if err := f.regenerateCertificate(); err != nil { + if err := f.regenerateCertificate(nextRenewCh); err != nil { f.Log.Error(err, "Failed to generate initial serving certificate, retrying...", "interval", interval) return false, nil } return true, nil }, stopCh); err != nil { + // In case of an error, the stopCh is closed; wait for authorityErrChan to be closed too + <-authorityErrChan + return err } @@ -99,24 +104,35 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { ch := make(chan struct{}) go func() { defer close(ch) + + var renewMoment time.Time + select { + case renewMoment = <-nextRenewCh: + // We recevieved a renew moment + default: + // This should never happen + panic("Unreacheable") + } + for { - // exit if stopCh closes + timer := time.NewTimer(renewMoment.Sub(time.Now())) + defer timer.Stop() + select { case <-stopCh: return - default: - } - // regenerate the certificate if we have gone past the 'nextRenew' time - if time.Now().After(f.nextRenew) { + case <-timer.C: ch <- struct{}{} + case renewMoment = <-nextRenewCh: + // We recevieved a renew moment, next loop iteration will update the timer } - time.Sleep(time.Second * 5) } }() return ch }() + // check the current certificate every 10s in case it needs updating - return wait.PollImmediateUntil(time.Second*10, func() (done bool, err error) { + if err := wait.PollImmediateUntil(time.Second*10, func() (done bool, err error) { // regenerate the serving certificate if the root CA has been rotated select { // if the authority has stopped for whatever reason, exit and return the error @@ -125,15 +141,15 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { return true, fmt.Errorf("failed to run certificate authority: %w", err) } if !ok { - return true, fmt.Errorf("certificate authority stopped") + return true, context.Canceled } // trigger regeneration if the root CA has been rotated case _, ok := <-rotationChan: if !ok { - return true, fmt.Errorf("channel closed") + return true, context.Canceled } f.Log.V(logf.InfoLevel).Info("Detected root CA rotation - regenerating serving certificates") - if err := f.regenerateCertificate(); err != nil { + if err := f.regenerateCertificate(nextRenewCh); err != nil { f.Log.Error(err, "Failed to regenerate serving certificate") // Return an error here and stop the source running - this case should never // occur, and if it does, indicates some form of internal error. @@ -142,15 +158,26 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { // trigger regeneration if a renewal is required case <-renewalChan: f.Log.V(logf.InfoLevel).Info("Serving certificate requires renewal, regenerating") - if err := f.regenerateCertificate(); err != nil { + if err := f.regenerateCertificate(nextRenewCh); err != nil { f.Log.Error(err, "Failed to regenerate serving certificate") // Return an error here and stop the source running - this case should never // occur, and if it does, indicates some form of internal error. return false, err } + case <-stopCh: + return true, context.Canceled } return false, nil - }, stopCh) + }, stopCh); err != nil { + // In case of an error, the stopCh is closed; wait for all channels to close + <-authorityErrChan + <-rotationChan + <-renewalChan + + return err + } + + return nil } func (f *DynamicSource) GetCertificate(*tls.ClientHelloInfo) (*tls.Certificate, error) { @@ -168,7 +195,7 @@ func (f *DynamicSource) Healthy() bool { // regenerateCertificate will trigger the cached certificate and private key to // be regenerated by requesting a new certificate from the authority. -func (f *DynamicSource) regenerateCertificate() error { +func (f *DynamicSource) regenerateCertificate(nextRenew chan<- time.Time) error { f.Log.V(logf.DebugLevel).Info("Generating new ECDSA private key") pk, err := pki.GenerateECPrivateKey(384) if err != nil { @@ -192,15 +219,13 @@ func (f *DynamicSource) regenerateCertificate() error { f.Log.V(logf.DebugLevel).Info("Signed new serving certificate") - if err := f.updateCertificate(pk, cert); err != nil { + if err := f.updateCertificate(pk, cert, nextRenew); err != nil { return err } - - f.Log.V(logf.InfoLevel).Info("Updated serving TLS certificate") return nil } -func (f *DynamicSource) updateCertificate(pk crypto.Signer, cert *x509.Certificate) error { +func (f *DynamicSource) updateCertificate(pk crypto.Signer, cert *x509.Certificate, nextRenew chan<- time.Time) error { f.lock.Lock() defer f.lock.Unlock() @@ -222,6 +247,8 @@ func (f *DynamicSource) updateCertificate(pk crypto.Signer, cert *x509.Certifica f.cachedCertificate = &bundle certDuration := cert.NotAfter.Sub(cert.NotBefore) // renew the certificate 1/3 of the time before its expiry - f.nextRenew = cert.NotAfter.Add(certDuration / -3) + nextRenew <- cert.NotAfter.Add(certDuration / -3) + + f.Log.V(logf.InfoLevel).Info("Updated serving TLS certificate") return nil } diff --git a/test/integration/certificates/metrics_controller_test.go b/test/integration/certificates/metrics_controller_test.go index 2e2dc1328a7..6f0c748c074 100644 --- a/test/integration/certificates/metrics_controller_test.go +++ b/test/integration/certificates/metrics_controller_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "net" "net/http" "strings" "testing" @@ -64,14 +65,20 @@ func TestMetricsController(t *testing.T) { if err != nil { t.Fatal(err) } - server := ctx.Metrics.NewServer(ln, false) + server := metricsHandler.NewServer(ln, false) + + go func() { + if err := server.Serve(ln); err != http.ErrServerClosed { + t.Fatal(err) + } + }() defer func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { - return err + t.Fatal(err) } }() From 632459c6d9d145b2c42fa1424c0abcdee1b63aa5 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Fri, 23 Jul 2021 15:41:24 +0200 Subject: [PATCH 5/9] resolve bug & cleanup Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/controller/app/controller.go | 2 +- pkg/metrics/metrics.go | 9 ++++----- pkg/webhook/server/BUILD.bazel | 2 +- pkg/webhook/server/server.go | 10 +++++----- pkg/webhook/server/tls/dynamic_source.go | 13 +++++++++++-- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 9d7c363b7bc..3b67d511761 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -129,7 +129,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { } g.Go(func() error { - log.V(logf.InfoLevel).Info("starting controller", n) + log.V(logf.InfoLevel).Info("starting controller") workers := 5 return iface.Run(workers, rootCtx.Done()) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index e4021c2281f..8ed9aa05a66 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -41,11 +41,10 @@ import ( const ( // Namespace is the namespace for cert-manager metric names - namespace = "certmanager" - prometheusMetricsServerShutdownTimeout = 5 * time.Second - prometheusMetricsServerReadTimeout = 8 * time.Second - prometheusMetricsServerWriteTimeout = 8 * time.Second - prometheusMetricsServerMaxHeaderBytes = 1 << 20 // 1 MiB + namespace = "certmanager" + prometheusMetricsServerReadTimeout = 8 * time.Second + prometheusMetricsServerWriteTimeout = 8 * time.Second + prometheusMetricsServerMaxHeaderBytes = 1 << 20 // 1 MiB ) // Metrics is designed to be a shared object for updating the metrics exposed diff --git a/pkg/webhook/server/BUILD.bazel b/pkg/webhook/server/BUILD.bazel index 6279c17cb0e..a7b47f1cc28 100644 --- a/pkg/webhook/server/BUILD.bazel +++ b/pkg/webhook/server/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "github.com/jetstack/cert-manager/pkg/webhook/server", visibility = ["//visibility:public"], deps = [ + "//cmd/util:go_default_library", "//pkg/logs:go_default_library", - "//pkg/util:go_default_library", "//pkg/util/profiling:go_default_library", "//pkg/webhook/handlers:go_default_library", "//pkg/webhook/server/tls:go_default_library", diff --git a/pkg/webhook/server/server.go b/pkg/webhook/server/server.go index 2d1dc732708..61c83c287d3 100644 --- a/pkg/webhook/server/server.go +++ b/pkg/webhook/server/server.go @@ -41,8 +41,8 @@ import ( ciphers "k8s.io/component-base/cli/flag" crlog "sigs.k8s.io/controller-runtime/pkg/log" + cmdutil "github.com/jetstack/cert-manager/cmd/util" logf "github.com/jetstack/cert-manager/pkg/logs" - "github.com/jetstack/cert-manager/pkg/util" "github.com/jetstack/cert-manager/pkg/util/profiling" "github.com/jetstack/cert-manager/pkg/webhook/handlers" servertls "github.com/jetstack/cert-manager/pkg/webhook/server/tls" @@ -127,7 +127,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { s.Log = crlog.NullLogger{} } - gctx := util.ContextWithStopCh(context.Background(), stopCh) + gctx := cmdutil.ContextWithStopCh(context.Background(), stopCh) g, gctx := errgroup.WithContext(gctx) // if a HealthzAddr is provided, start the healthz listener @@ -164,7 +164,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { } // create a listener for actual webhook requests - listerner, err := net.Listen("tcp", s.ListenAddr) + listener, err := net.Listen("tcp", s.ListenAddr) if err != nil { return err } @@ -186,7 +186,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { if err != nil { return err } - listerner = tls.NewListener(listerner, &tls.Config{ + listener = tls.NewListener(listener, &tls.Config{ GetCertificate: s.CertificateSource.GetCertificate, CipherSuites: cipherSuites, MinVersion: minVersion, @@ -196,7 +196,7 @@ func (s *Server) Run(stopCh <-chan struct{}) error { s.Log.V(logf.InfoLevel).Info("listening for insecure connections", "address", s.ListenAddr) } - s.listener = listerner + s.listener = listener mux := http.NewServeMux() mux.HandleFunc("/validate", s.handle(s.validate)) mux.HandleFunc("/mutate", s.handle(s.mutate)) diff --git a/pkg/webhook/server/tls/dynamic_source.go b/pkg/webhook/server/tls/dynamic_source.go index 1b25b56ef8c..6642bacc5af 100644 --- a/pkg/webhook/server/tls/dynamic_source.go +++ b/pkg/webhook/server/tls/dynamic_source.go @@ -115,14 +115,23 @@ func (f *DynamicSource) Run(stopCh <-chan struct{}) error { } for { - timer := time.NewTimer(renewMoment.Sub(time.Now())) + timer := time.NewTimer(time.Until(renewMoment)) defer timer.Stop() select { case <-stopCh: return case <-timer.C: - ch <- struct{}{} + // Try to send a message on ch, but also allow for a stop signal or + // a new renewMoment to be received + select { + case <-stopCh: + return + case ch <- struct{}{}: + // Message was sent on channel + case renewMoment = <-nextRenewCh: + // We recevieved a renew moment, next loop iteration will update the timer + } case renewMoment = <-nextRenewCh: // We recevieved a renew moment, next loop iteration will update the timer } From 48e9c2bd16549c95de45e5e1733cf3188d7f8b80 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 27 Jul 2021 19:43:08 +0200 Subject: [PATCH 6/9] exit with exit code 0 on cancel & release leader-election on cancel Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/cainjector/app/start.go | 19 ++++++++++--------- cmd/controller/app/controller.go | 19 +++++++++++++------ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/cmd/cainjector/app/start.go b/cmd/cainjector/app/start.go index f122c87cb69..667b7d7651e 100644 --- a/cmd/cainjector/app/start.go +++ b/cmd/cainjector/app/start.go @@ -117,15 +117,16 @@ servers and webhook servers.`, func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) error { mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ - Scheme: api.Scheme, - Namespace: o.Namespace, - LeaderElection: o.LeaderElect, - LeaderElectionNamespace: o.LeaderElectionNamespace, - LeaderElectionID: "cert-manager-cainjector-leader-election", - LeaseDuration: &o.LeaseDuration, - RenewDeadline: &o.RenewDeadline, - RetryPeriod: &o.RetryPeriod, - MetricsBindAddress: "0", + Scheme: api.Scheme, + Namespace: o.Namespace, + LeaderElection: o.LeaderElect, + LeaderElectionNamespace: o.LeaderElectionNamespace, + LeaderElectionID: "cert-manager-cainjector-leader-election", + LeaderElectionReleaseOnCancel: true, + LeaseDuration: &o.LeaseDuration, + RenewDeadline: &o.RenewDeadline, + RetryPeriod: &o.RetryPeriod, + MetricsBindAddress: "0", }) if err != nil { return fmt.Errorf("error creating manager: %v", err) diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 3b67d511761..6ec3e8bff1d 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -337,15 +337,22 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l // Try and become the leader and start controller manager loops leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: ml, - LeaseDuration: opts.LeaderElectionLeaseDuration, - RenewDeadline: opts.LeaderElectionRenewDeadline, - RetryPeriod: opts.LeaderElectionRetryPeriod, + Lock: ml, + LeaseDuration: opts.LeaderElectionLeaseDuration, + RenewDeadline: opts.LeaderElectionRenewDeadline, + RetryPeriod: opts.LeaderElectionRetryPeriod, + ReleaseOnCancel: true, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { - log.V(logf.ErrorLevel).Info("leader election lost") - os.Exit(1) + select { + case <-ctx.Done(): + // context was canceled, just return + return + default: + log.V(logf.ErrorLevel).Info("leader election lost") + os.Exit(1) + } }, }, }) From d430113666d16ad396cbd6cb421d32265d444677 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 27 Jul 2021 21:40:42 +0200 Subject: [PATCH 7/9] remove os.Exit from cert-manager controller and make sure LeaderElection ReleaseOnCancel works Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/cainjector/app/start.go | 10 +- cmd/controller/app/controller.go | 151 +++++++++++++++++++------------ cmd/controller/app/start.go | 7 +- 3 files changed, 98 insertions(+), 70 deletions(-) diff --git a/cmd/cainjector/app/start.go b/cmd/cainjector/app/start.go index 667b7d7651e..625abb6897e 100644 --- a/cmd/cainjector/app/start.go +++ b/cmd/cainjector/app/start.go @@ -145,14 +145,12 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er return nil }) - // Don't launch the controllers unless we have been elected leader - <-mgr.Elected() - - // Exit early if the Elected channel gets closed because we are shutting down. select { - case <-gctx.Done(): + case <-gctx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + // Wait for error group to complete and return return g.Wait() - default: + case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader + // Continue with setting up controller } // Retry the start up of the certificate based controller in case the diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index 6ec3e8bff1d..c13be3f2ae4 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -18,6 +18,7 @@ package app import ( "context" + "errors" "fmt" "net" "net/http" @@ -65,8 +66,10 @@ const controllerAgentName = "cert-manager" //and following discussion: https://github.com/kubernetes-sigs/controller-runtime/pull/88#issuecomment-408500629 const resyncPeriod = 10 * time.Hour -func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { +func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { rootCtx := cmdutil.ContextWithStopCh(context.Background(), stopCh) + rootCtx, cancelContext := context.WithCancel(rootCtx) + defer cancelContext() g, rootCtx := errgroup.WithContext(rootCtx) rootCtx = logf.NewContext(rootCtx, nil, "controller") log := logf.FromContext(rootCtx) @@ -74,7 +77,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { ctx, kubeCfg, err := buildControllerContext(rootCtx, opts) if err != nil { log.Error(err, "error building controller context", "options", opts) - os.Exit(1) + return err } enabledControllers := opts.EnabledControllers() @@ -83,7 +86,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { ln, err := net.Listen("tcp", opts.MetricsListenAddress) if err != nil { log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress) - os.Exit(1) + return err } server := ctx.Metrics.NewServer(ln, opts.EnablePprof) @@ -106,65 +109,99 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) { return nil }) - run := func(_ context.Context) { - for n, fn := range controller.Known() { - log := log.WithValues("controller", n) - - // only run a controller if it's been enabled - if !enabledControllers.Has(n) { - log.V(logf.InfoLevel).Info("not starting controller as it's disabled") - continue + elected := make(chan struct{}) + if opts.LeaderElect { + g.Go(func() error { + log.V(logf.InfoLevel).Info("starting leader election") + leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) + if err != nil { + log.Error(err, "error creating leader election client") + return err } - // don't run clusterissuers controller if scoped to a single namespace - if ctx.Namespace != "" && n == clusterissuers.ControllerName { - log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace") - continue + errorCh := make(chan error, 1) + if err := startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + close(elected) + }, + OnStoppedLeading: func() { + select { + case <-rootCtx.Done(): + // context was canceled, just return + return + default: + log.V(logf.ErrorLevel).Info("leader election lost") + errorCh <- errors.New("leader election lost") + } + }, + }); err != nil { + return err } - iface, err := fn(ctx) - if err != nil { - log.Error(err, "error starting controller") - os.Exit(1) + select { + case err := <-errorCh: + return err + default: + return nil } + }) + } else { + close(elected) + } - g.Go(func() error { - log.V(logf.InfoLevel).Info("starting controller") + select { + case <-rootCtx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + // Wait for error group to complete and return + return g.Wait() + case <-elected: // Don't launch the controllers unless we have been elected leader + // Continue with setting up controller + } - workers := 5 - return iface.Run(workers, rootCtx.Done()) - }) + for n, fn := range controller.Known() { + log := log.WithValues("controller", n) + + // only run a controller if it's been enabled + if !enabledControllers.Has(n) { + log.V(logf.InfoLevel).Info("not starting controller as it's disabled") + continue } - log.V(logf.DebugLevel).Info("starting shared informer factories") - // TODO: we should wait for these informers to finish - ctx.SharedInformerFactory.Start(rootCtx.Done()) - ctx.KubeSharedInformerFactory.Start(rootCtx.Done()) - ctx.GWShared.Start(rootCtx.Done()) + // don't run clusterissuers controller if scoped to a single namespace + if ctx.Namespace != "" && n == clusterissuers.ControllerName { + log.V(logf.InfoLevel).Info("not starting controller as cert-manager has been scoped to a single namespace") + continue + } - err := g.Wait() + iface, err := fn(ctx) if err != nil { log.Error(err, "error starting controller") - os.Exit(1) + + cancelContext() + _ = g.Wait() // Don't process errors, we already have an error + return err } - log.V(logf.InfoLevel).Info("control loops exited") - os.Exit(0) - } + g.Go(func() error { + log.V(logf.InfoLevel).Info("starting controller") - if !opts.LeaderElect { - run(context.TODO()) - return + workers := 5 + return iface.Run(workers, rootCtx.Done()) + }) } - log.V(logf.InfoLevel).Info("starting leader election") - leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) + log.V(logf.DebugLevel).Info("starting shared informer factories") + ctx.SharedInformerFactory.Start(rootCtx.Done()) + ctx.KubeSharedInformerFactory.Start(rootCtx.Done()) + ctx.GWShared.Start(rootCtx.Done()) + + err = g.Wait() if err != nil { - log.Error(err, "error creating leader election client") - os.Exit(1) + log.Error(err, "error starting controller") + return err } + log.V(logf.InfoLevel).Info("control loops exited") - startLeaderElection(rootCtx, opts, leaderElectionClient, ctx.Recorder, run) + return nil } func buildControllerContext(ctx context.Context, opts *options.ControllerOptions) (*controller.Context, *rest.Config, error) { @@ -303,14 +340,14 @@ func buildControllerContext(ctx context.Context, opts *options.ControllerOptions }, kubeCfg, nil } -func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, run func(context.Context)) { +func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error { log := logf.FromContext(ctx, "leader-election") // Identity used to distinguish between multiple controller manager instances id, err := os.Hostname() if err != nil { log.Error(err, "error getting hostname") - os.Exit(1) + return err } // Set up Multilock for leader election. This Multilock is here for the @@ -331,29 +368,23 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l if err != nil { // We should never get here. log.Error(err, "error creating leader election lock") - os.Exit(1) - + return err } // Try and become the leader and start controller manager loops - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: ml, LeaseDuration: opts.LeaderElectionLeaseDuration, RenewDeadline: opts.LeaderElectionRenewDeadline, RetryPeriod: opts.LeaderElectionRetryPeriod, ReleaseOnCancel: true, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - select { - case <-ctx.Done(): - // context was canceled, just return - return - default: - log.V(logf.ErrorLevel).Info("leader election lost") - os.Exit(1) - } - }, - }, + Callbacks: callbacks, }) + if err != nil { + return err + } + + le.Run(ctx) + + return nil } diff --git a/cmd/controller/app/start.go b/cmd/controller/app/start.go index 5693d28abf3..975b20a86f8 100644 --- a/cmd/controller/app/start.go +++ b/cmd/controller/app/start.go @@ -73,8 +73,7 @@ to renew certificates at an appropriate time before expiry.`, } logf.Log.V(logf.InfoLevel).Info("starting controller", "version", util.AppVersion, "git-commit", util.AppGitCommit) - o.RunCertManagerController(stopCh) - return nil + return o.RunCertManagerController(stopCh) }, } @@ -91,6 +90,6 @@ func (o CertManagerControllerOptions) Validate(args []string) error { return utilerrors.NewAggregate(errors) } -func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) { - Run(o.ControllerOptions, stopCh) +func (o CertManagerControllerOptions) RunCertManagerController(stopCh <-chan struct{}) error { + return Run(o.ControllerOptions, stopCh) } From d867fcc44dcb7a9b4345641eccbe4232d4f82dc5 Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Tue, 27 Jul 2021 21:43:54 +0200 Subject: [PATCH 8/9] remove unnecessary wait.Until Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- pkg/controller/controller.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d52ab705e58..31968b4f027 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -110,8 +110,7 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) error { wg.Add(1) go func() { defer wg.Done() - // TODO (@munnerz): make time.Second duration configurable - wait.Until(func() { c.worker(ctx) }, time.Second, stopCh) + c.worker(ctx) }() } From 78d13787e61ca22620e2d276461511fdefa4a20c Mon Sep 17 00:00:00 2001 From: Inteon <42113979+inteon@users.noreply.github.com> Date: Wed, 28 Jul 2021 15:55:14 +0200 Subject: [PATCH 9/9] remove duplicated error messages & cobra help messages on error Signed-off-by: Inteon <42113979+inteon@users.noreply.github.com> --- cmd/cainjector/app/start.go | 2 +- cmd/controller/app/controller.go | 35 ++++++++++++++------------------ cmd/controller/app/start.go | 8 +++++++- cmd/controller/main.go | 2 +- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/cmd/cainjector/app/start.go b/cmd/cainjector/app/start.go index 625abb6897e..cf80ad62882 100644 --- a/cmd/cainjector/app/start.go +++ b/cmd/cainjector/app/start.go @@ -146,7 +146,7 @@ func (o InjectorControllerOptions) RunInjectorController(ctx context.Context) er }) select { - case <-gctx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + case <-gctx.Done(): // Exit early if we are shutting down or if the manager has exited with an error // Wait for error group to complete and return return g.Wait() case <-mgr.Elected(): // Don't launch the controllers unless we have been elected leader diff --git a/cmd/controller/app/controller.go b/cmd/controller/app/controller.go index c13be3f2ae4..8a467b5123a 100644 --- a/cmd/controller/app/controller.go +++ b/cmd/controller/app/controller.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + utilerrors "k8s.io/apimachinery/pkg/util/errors" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -76,8 +77,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { ctx, kubeCfg, err := buildControllerContext(rootCtx, opts) if err != nil { - log.Error(err, "error building controller context", "options", opts) - return err + return fmt.Errorf("error building controller context (options %v): %v", opts, err) } enabledControllers := opts.EnabledControllers() @@ -85,8 +85,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { ln, err := net.Listen("tcp", opts.MetricsListenAddress) if err != nil { - log.Error(err, "failed to listen on prometheus address", "address", opts.MetricsListenAddress) - return err + return fmt.Errorf("failed to listen on prometheus address %s: %v", opts.MetricsListenAddress, err) } server := ctx.Metrics.NewServer(ln, opts.EnablePprof) @@ -102,7 +101,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { return nil }) g.Go(func() error { - log.WithValues("address", ln.Addr()).V(logf.InfoLevel).Info("listening for connections on") + log.V(logf.InfoLevel).Info("starting metrics server", "address", ln.Addr()) if err := server.Serve(ln); err != http.ErrServerClosed { return err } @@ -115,8 +114,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { log.V(logf.InfoLevel).Info("starting leader election") leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(kubeCfg, "leader-election")) if err != nil { - log.Error(err, "error creating leader election client") - return err + return fmt.Errorf("error creating leader election client: %v", err) } errorCh := make(chan error, 1) @@ -130,7 +128,6 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { // context was canceled, just return return default: - log.V(logf.ErrorLevel).Info("leader election lost") errorCh <- errors.New("leader election lost") } }, @@ -150,7 +147,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { } select { - case <-rootCtx.Done(): // Exit early if the Elected channel gets closed because we are shutting down. + case <-rootCtx.Done(): // Exit early if we are shutting down or if the errgroup has already exited with an error // Wait for error group to complete and return return g.Wait() case <-elected: // Don't launch the controllers unless we have been elected leader @@ -174,16 +171,20 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { iface, err := fn(ctx) if err != nil { - log.Error(err, "error starting controller") + err = fmt.Errorf("error starting controller: %v", err) cancelContext() - _ = g.Wait() // Don't process errors, we already have an error + err2 := g.Wait() // Don't process errors, we already have an error + if err2 != nil { + return utilerrors.NewAggregate([]error{err, err2}) + } return err } g.Go(func() error { log.V(logf.InfoLevel).Info("starting controller") + // TODO: make this either a constant or a command line flag workers := 5 return iface.Run(workers, rootCtx.Done()) }) @@ -196,8 +197,7 @@ func Run(opts *options.ControllerOptions, stopCh <-chan struct{}) error { err = g.Wait() if err != nil { - log.Error(err, "error starting controller") - return err + return fmt.Errorf("error starting controller: %v", err) } log.V(logf.InfoLevel).Info("control loops exited") @@ -341,13 +341,10 @@ func buildControllerContext(ctx context.Context, opts *options.ControllerOptions } func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, leaderElectionClient kubernetes.Interface, recorder record.EventRecorder, callbacks leaderelection.LeaderCallbacks) error { - log := logf.FromContext(ctx, "leader-election") - // Identity used to distinguish between multiple controller manager instances id, err := os.Hostname() if err != nil { - log.Error(err, "error getting hostname") - return err + return fmt.Errorf("error getting hostname: %v", err) } // Set up Multilock for leader election. This Multilock is here for the @@ -366,9 +363,7 @@ func startLeaderElection(ctx context.Context, opts *options.ControllerOptions, l lc, ) if err != nil { - // We should never get here. - log.Error(err, "error creating leader election lock") - return err + return fmt.Errorf("error creating leader election lock: %v", err) } // Try and become the leader and start controller manager loops diff --git a/cmd/controller/app/start.go b/cmd/controller/app/start.go index 975b20a86f8..aeea620fdfe 100644 --- a/cmd/controller/app/start.go +++ b/cmd/controller/app/start.go @@ -73,8 +73,14 @@ to renew certificates at an appropriate time before expiry.`, } logf.Log.V(logf.InfoLevel).Info("starting controller", "version", util.AppVersion, "git-commit", util.AppGitCommit) - return o.RunCertManagerController(stopCh) + if err := o.RunCertManagerController(stopCh); err != nil { + cmd.SilenceUsage = true // Don't display usage information when exiting because of an error + return err + } + + return nil }, + SilenceErrors: true, // Errors are already logged when calling cmd.Execute() } flags := cmd.Flags() diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 2c3eeef2ce7..d3b3e034a29 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -36,7 +36,7 @@ func main() { flag.CommandLine.Parse([]string{}) if err := cmd.Execute(); err != nil { - logf.Log.Error(err, "error executing command") + logf.Log.Error(err, "error while executing") util.SetExitCode(err) } }