diff --git a/cmd/rollouts-controller/main.go b/cmd/rollouts-controller/main.go index ad7190c585..99afcef49e 100644 --- a/cmd/rollouts-controller/main.go +++ b/cmd/rollouts-controller/main.go @@ -7,7 +7,6 @@ import ( "time" "github.com/argoproj/argo-rollouts/utils/record" - "github.com/argoproj/pkg/kubeclientmetrics" smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned" log "github.com/sirupsen/logrus" @@ -42,8 +41,12 @@ const ( cliName = "argo-rollouts" jsonFormat = "json" textFormat = "text" + + controllerAnalysis = "analysis" ) +var supportedControllers = map[string]bool{controllerAnalysis: true} + func newCommand() *cobra.Command { var ( clientConfig clientcmd.ClientConfig @@ -72,6 +75,7 @@ func newCommand() *cobra.Command { namespaced bool printVersion bool selfServiceNotificationEnabled bool + controllersEnabled []string ) electOpts := controller.NewLeaderElectionOptions() var command = cobra.Command{ @@ -185,41 +189,67 @@ func newCommand() *cobra.Command { ingressWrapper, err := ingressutil.NewIngressWrapper(mode, kubeClient, kubeInformerFactory) checkError(err) - cm := controller.NewManager( - namespace, - kubeClient, - argoprojClient, - dynamicClient, - smiClient, - discoveryClient, - kubeInformerFactory.Apps().V1().ReplicaSets(), - kubeInformerFactory.Core().V1().Services(), - ingressWrapper, - jobInformerFactory.Batch().V1().Jobs(), - tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory), - tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory), - tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory), - tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory), - tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory), - istioPrimaryDynamicClient, - istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(), - istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(), - notificationConfigMapInformerFactory, - notificationSecretInformerFactory, - resyncDuration, - instanceID, - metricsPort, - healthzPort, - k8sRequestProvider, - nginxIngressClasses, - albIngressClasses, - dynamicInformerFactory, - clusterDynamicInformerFactory, - istioDynamicInformerFactory, - namespaced, - kubeInformerFactory, - jobInformerFactory) + var cm *controller.Manager + + enabledControllers, err := getEnabledControllers(controllersEnabled) + checkError(err) + // currently only supports running analysis controller independently + if enabledControllers[controllerAnalysis] { + log.Info("Running only analysis controller") + cm = controller.NewAnalysisManager( + namespace, + kubeClient, + argoprojClient, + jobInformerFactory.Batch().V1().Jobs(), + tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory), + resyncDuration, + metricsPort, + healthzPort, + k8sRequestProvider, + dynamicInformerFactory, + clusterDynamicInformerFactory, + namespaced, + kubeInformerFactory, + jobInformerFactory) + } else { + cm = controller.NewManager( + namespace, + kubeClient, + argoprojClient, + dynamicClient, + smiClient, + discoveryClient, + kubeInformerFactory.Apps().V1().ReplicaSets(), + kubeInformerFactory.Core().V1().Services(), + ingressWrapper, + jobInformerFactory.Batch().V1().Jobs(), + tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory), + tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory), + istioPrimaryDynamicClient, + istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(), + istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(), + notificationConfigMapInformerFactory, + notificationSecretInformerFactory, + resyncDuration, + instanceID, + metricsPort, + healthzPort, + k8sRequestProvider, + nginxIngressClasses, + albIngressClasses, + dynamicInformerFactory, + clusterDynamicInformerFactory, + istioDynamicInformerFactory, + namespaced, + kubeInformerFactory, + jobInformerFactory) + } if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil { log.Fatalf("Error running controller: %s", err.Error()) } @@ -262,6 +292,7 @@ func newCommand() *cobra.Command { command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.") command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.") command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.") + command.Flags().StringSliceVar(&controllersEnabled, "controllers", nil, "Explicitly specify the list of controllers to run, currently only supports 'analysis', eg. --controller=analysis. Default: all controllers are enabled") return &command } @@ -315,3 +346,15 @@ func checkError(err error) { log.Fatal(err) } } + +func getEnabledControllers(controllersEnabled []string) (map[string]bool, error) { + enabledControllers := make(map[string]bool) + for _, controller := range controllersEnabled { + if supportedControllers[controller] { + enabledControllers[controller] = true + } else { + return nil, fmt.Errorf("unsupported controller: %s", controller) + } + } + return enabledControllers, nil +} diff --git a/controller/controller.go b/controller/controller.go index afe4bc769b..9aaf8ee09f 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -163,6 +163,87 @@ type Manager struct { notificationSecretInformerFactory kubeinformers.SharedInformerFactory jobInformerFactory kubeinformers.SharedInformerFactory istioPrimaryDynamicClient dynamic.Interface + + onlyAnalysisMode bool +} + +func NewAnalysisManager( + namespace string, + kubeclientset kubernetes.Interface, + argoprojclientset clientset.Interface, + jobInformer batchinformers.JobInformer, + analysisRunInformer informers.AnalysisRunInformer, + analysisTemplateInformer informers.AnalysisTemplateInformer, + clusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer, + resyncPeriod time.Duration, + metricsPort int, + healthzPort int, + k8sRequestProvider *metrics.K8sRequestsCountProvider, + dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, + clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, + namespaced bool, + kubeInformerFactory kubeinformers.SharedInformerFactory, + jobInformerFactory kubeinformers.SharedInformerFactory, +) *Manager { + runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme)) + log.Info("Creating event broadcaster") + + metricsAddr := fmt.Sprintf(listenAddr, metricsPort) + metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{ + Addr: metricsAddr, + RolloutLister: nil, + AnalysisRunLister: analysisRunInformer.Lister(), + AnalysisTemplateLister: analysisTemplateInformer.Lister(), + ClusterAnalysisTemplateLister: clusterAnalysisTemplateInformer.Lister(), + ExperimentLister: nil, + K8SRequestProvider: k8sRequestProvider, + }) + + healthzServer := NewHealthzServer(fmt.Sprintf(listenAddr, healthzPort)) + analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns") + recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, nil) + analysisController := analysis.NewController(analysis.ControllerConfig{ + KubeClientSet: kubeclientset, + ArgoProjClientset: argoprojclientset, + AnalysisRunInformer: analysisRunInformer, + JobInformer: jobInformer, + ResyncPeriod: resyncPeriod, + AnalysisRunWorkQueue: analysisRunWorkqueue, + MetricsServer: metricsServer, + Recorder: recorder, + }) + + cm := &Manager{ + wg: &sync.WaitGroup{}, + metricsServer: metricsServer, + healthzServer: healthzServer, + jobSynced: jobInformer.Informer().HasSynced, + analysisRunSynced: analysisRunInformer.Informer().HasSynced, + analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced, + clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced, + analysisRunWorkqueue: analysisRunWorkqueue, + analysisController: analysisController, + namespace: namespace, + kubeClientSet: kubeclientset, + dynamicInformerFactory: dynamicInformerFactory, + clusterDynamicInformerFactory: clusterDynamicInformerFactory, + namespaced: namespaced, + kubeInformerFactory: kubeInformerFactory, + jobInformerFactory: jobInformerFactory, + onlyAnalysisMode: true, + } + + _, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName) + if err != nil { + log.Fatalf("Failed to init config: %v", err) + } + + err = plugin.DownloadPlugins(plugin.FileDownloaderImpl{}) + if err != nil { + log.Fatalf("Failed to download plugins: %v", err) + } + + return cm } // NewManager returns a new manager to manage all the controllers @@ -441,11 +522,13 @@ func (c *Manager) Run(ctx context.Context, rolloutThreadiness, serviceThreadines log.Info("Shutting down workers") goPlugin.CleanupClients() - c.serviceWorkqueue.ShutDownWithDrain() - c.ingressWorkqueue.ShutDownWithDrain() - c.rolloutWorkqueue.ShutDownWithDrain() - c.experimentWorkqueue.ShutDownWithDrain() - c.analysisRunWorkqueue.ShutDownWithDrain() + if !c.onlyAnalysisMode { + c.serviceWorkqueue.ShutDownWithDrain() + c.ingressWorkqueue.ShutDownWithDrain() + c.rolloutWorkqueue.ShutDownWithDrain() + c.experimentWorkqueue.ShutDownWithDrain() + } + c.analysisRunWorkqueue.ShutDownWithDrain() ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) // give max of 10 seconds for http servers to shut down @@ -463,12 +546,6 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT // Start the informer factories to begin populating the informer caches log.Info("Starting Controllers") - c.notificationConfigMapInformerFactory.Start(ctx.Done()) - c.notificationSecretInformerFactory.Start(ctx.Done()) - if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok { - log.Fatalf("failed to wait for configmap/secret caches to sync, exiting") - } - // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. c.dynamicInformerFactory.Start(ctx.Done()) @@ -479,29 +556,50 @@ func (c *Manager) startLeading(ctx context.Context, rolloutThreadiness, serviceT c.jobInformerFactory.Start(ctx.Done()) - // Check if Istio installed on cluster before starting dynamicInformerFactory - if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) { - c.istioDynamicInformerFactory.Start(ctx.Done()) - } + if c.onlyAnalysisMode { + log.Info("Waiting for controller's informer caches to sync") + if ok := cache.WaitForCacheSync(ctx.Done(), c.analysisRunSynced, c.analysisTemplateSynced, c.jobSynced); !ok { + log.Fatalf("failed to wait for caches to sync, exiting") + } + // only wait for cluster scoped informers to sync if we are running in cluster-wide mode + if c.namespace == metav1.NamespaceAll { + if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok { + log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting") + } + } + go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + } else { - // Wait for the caches to be synced before starting workers - log.Info("Waiting for controller's informer caches to sync") - if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok { - log.Fatalf("failed to wait for caches to sync, exiting") - } - // only wait for cluster scoped informers to sync if we are running in cluster-wide mode - if c.namespace == metav1.NamespaceAll { - if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok { - log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting") + c.notificationConfigMapInformerFactory.Start(ctx.Done()) + c.notificationSecretInformerFactory.Start(ctx.Done()) + if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok { + log.Fatalf("failed to wait for configmap/secret caches to sync, exiting") } - } - go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done()) - go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done()) - go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done()) - go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done()) - go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done()) - go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done()) + // Check if Istio installed on cluster before starting dynamicInformerFactory + if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) { + c.istioDynamicInformerFactory.Start(ctx.Done()) + } + // Wait for the caches to be synced before starting workers + log.Info("Waiting for controller's informer caches to sync") + if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok { + log.Fatalf("failed to wait for caches to sync, exiting") + } + // only wait for cluster scoped informers to sync if we are running in cluster-wide mode + if c.namespace == metav1.NamespaceAll { + if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok { + log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting") + } + } + + go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done()) + go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done()) + + } log.Info("Started controller") } diff --git a/controller/metrics/metrics.go b/controller/metrics/metrics.go index bad18a1eb3..9691b73a40 100644 --- a/controller/metrics/metrics.go +++ b/controller/metrics/metrics.go @@ -55,9 +55,13 @@ func NewMetricsServer(cfg ServerConfig) *MetricsServer { reg := prometheus.NewRegistry() - reg.MustRegister(NewRolloutCollector(cfg.RolloutLister)) + if cfg.RolloutLister != nil { + reg.MustRegister(NewRolloutCollector(cfg.RolloutLister)) + } + if cfg.ExperimentLister != nil { + reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister)) + } reg.MustRegister(NewAnalysisRunCollector(cfg.AnalysisRunLister, cfg.AnalysisTemplateLister, cfg.ClusterAnalysisTemplateLister)) - reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister)) cfg.K8SRequestProvider.MustRegister(reg) reg.MustRegister(MetricRolloutReconcile) reg.MustRegister(MetricRolloutReconcileError) diff --git a/utils/record/record.go b/utils/record/record.go index 859b954390..9e50f3ba65 100644 --- a/utils/record/record.go +++ b/utils/record/record.go @@ -208,20 +208,22 @@ func (e *EventRecorderAdapter) defaultEventf(object runtime.Object, warn bool, o e.RolloutEventCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc() } - apis, err := e.apiFactory.GetAPIsFromNamespace(namespace) - if err != nil { - logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err) - e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc() - } - - for _, api := range apis { - err := e.sendNotifications(api, object, opts) + if e.apiFactory != nil { + apis, err := e.apiFactory.GetAPIsFromNamespace(namespace) if err != nil { - logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err) + logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err) + e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc() + } + + for _, api := range apis { + err := e.sendNotifications(api, object, opts) + if err != nil { + logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err) + } } } - } + } logFn := logCtx.Infof if warn { logFn = logCtx.Warnf