Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ability to run only the analysis controller #3336

Merged
merged 2 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 58 additions & 35 deletions cmd/rollouts-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/argoproj/argo-rollouts/utils/record"

"github.com/argoproj/pkg/kubeclientmetrics"
smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -72,6 +71,7 @@ func newCommand() *cobra.Command {
namespaced bool
printVersion bool
selfServiceNotificationEnabled bool
onlyAnalysisMode bool
)
electOpts := controller.NewLeaderElectionOptions()
var command = cobra.Command{
Expand Down Expand Up @@ -185,41 +185,63 @@ func newCommand() *cobra.Command {
ingressWrapper, err := ingressutil.NewIngressWrapper(mode, kubeClient, kubeInformerFactory)
checkError(err)

cm := controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
var cm *controller.Manager

if onlyAnalysisMode {
log.Info("Running only analysis controller")
cm = controller.NewAnalysisManager(
namespace,
kubeClient,
argoprojClient,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
resyncDuration,
metricsPort,
healthzPort,
k8sRequestProvider,
dynamicInformerFactory,
clusterDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
} else {
cm = controller.NewManager(
namespace,
kubeClient,
argoprojClient,
dynamicClient,
smiClient,
discoveryClient,
kubeInformerFactory.Apps().V1().ReplicaSets(),
kubeInformerFactory.Core().V1().Services(),
ingressWrapper,
jobInformerFactory.Batch().V1().Jobs(),
tolerantinformer.NewTolerantRolloutInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantExperimentInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisRunInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantAnalysisTemplateInformer(dynamicInformerFactory),
tolerantinformer.NewTolerantClusterAnalysisTemplateInformer(clusterDynamicInformerFactory),
istioPrimaryDynamicClient,
istioDynamicInformerFactory.ForResource(istioutil.GetIstioVirtualServiceGVR()).Informer(),
istioDynamicInformerFactory.ForResource(istioutil.GetIstioDestinationRuleGVR()).Informer(),
notificationConfigMapInformerFactory,
notificationSecretInformerFactory,
resyncDuration,
instanceID,
metricsPort,
healthzPort,
k8sRequestProvider,
nginxIngressClasses,
albIngressClasses,
dynamicInformerFactory,
clusterDynamicInformerFactory,
istioDynamicInformerFactory,
namespaced,
kubeInformerFactory,
jobInformerFactory)
}
if err = cm.Run(ctx, rolloutThreads, serviceThreads, ingressThreads, experimentThreads, analysisThreads, electOpts); err != nil {
log.Fatalf("Error running controller: %s", err.Error())
}
Expand Down Expand Up @@ -262,6 +284,7 @@ func newCommand() *cobra.Command {
command.Flags().DurationVar(&electOpts.LeaderElectionRenewDeadline, "leader-election-renew-deadline", controller.DefaultLeaderElectionRenewDeadline, "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled.")
command.Flags().DurationVar(&electOpts.LeaderElectionRetryPeriod, "leader-election-retry-period", controller.DefaultLeaderElectionRetryPeriod, "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled.")
command.Flags().BoolVar(&selfServiceNotificationEnabled, "self-service-notification-enabled", false, "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification.")
command.Flags().BoolVar(&onlyAnalysisMode, "only-analysis-mode", false, "Only runs analysis controller")
gdsoumya marked this conversation as resolved.
Show resolved Hide resolved
return &command
}

Expand Down
160 changes: 129 additions & 31 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,87 @@
notificationSecretInformerFactory kubeinformers.SharedInformerFactory
jobInformerFactory kubeinformers.SharedInformerFactory
istioPrimaryDynamicClient dynamic.Interface

onlyAnalysisMode bool
}

func NewAnalysisManager(
namespace string,
kubeclientset kubernetes.Interface,
argoprojclientset clientset.Interface,
jobInformer batchinformers.JobInformer,
analysisRunInformer informers.AnalysisRunInformer,
analysisTemplateInformer informers.AnalysisTemplateInformer,
clusterAnalysisTemplateInformer informers.ClusterAnalysisTemplateInformer,
resyncPeriod time.Duration,
metricsPort int,
healthzPort int,
k8sRequestProvider *metrics.K8sRequestsCountProvider,
dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
clusterDynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory,
namespaced bool,
kubeInformerFactory kubeinformers.SharedInformerFactory,
jobInformerFactory kubeinformers.SharedInformerFactory,
) *Manager {
runtime.Must(rolloutscheme.AddToScheme(scheme.Scheme))
log.Info("Creating event broadcaster")

metricsAddr := fmt.Sprintf(listenAddr, metricsPort)
metricsServer := metrics.NewMetricsServer(metrics.ServerConfig{
Addr: metricsAddr,
RolloutLister: nil,
AnalysisRunLister: analysisRunInformer.Lister(),
AnalysisTemplateLister: analysisTemplateInformer.Lister(),
ClusterAnalysisTemplateLister: clusterAnalysisTemplateInformer.Lister(),
ExperimentLister: nil,
K8SRequestProvider: k8sRequestProvider,
})

healthzServer := NewHealthzServer(fmt.Sprintf(listenAddr, healthzPort))
analysisRunWorkqueue := workqueue.NewNamedRateLimitingQueue(queue.DefaultArgoRolloutsRateLimiter(), "AnalysisRuns")
recorder := record.NewEventRecorder(kubeclientset, metrics.MetricRolloutEventsTotal, metrics.MetricNotificationFailedTotal, metrics.MetricNotificationSuccessTotal, metrics.MetricNotificationSend, nil)
analysisController := analysis.NewController(analysis.ControllerConfig{
KubeClientSet: kubeclientset,
ArgoProjClientset: argoprojclientset,
AnalysisRunInformer: analysisRunInformer,
JobInformer: jobInformer,
ResyncPeriod: resyncPeriod,
AnalysisRunWorkQueue: analysisRunWorkqueue,
MetricsServer: metricsServer,
Recorder: recorder,
})

cm := &Manager{
wg: &sync.WaitGroup{},
metricsServer: metricsServer,
healthzServer: healthzServer,
jobSynced: jobInformer.Informer().HasSynced,
analysisRunSynced: analysisRunInformer.Informer().HasSynced,
analysisTemplateSynced: analysisTemplateInformer.Informer().HasSynced,
clusterAnalysisTemplateSynced: clusterAnalysisTemplateInformer.Informer().HasSynced,
analysisRunWorkqueue: analysisRunWorkqueue,
analysisController: analysisController,
namespace: namespace,
kubeClientSet: kubeclientset,
dynamicInformerFactory: dynamicInformerFactory,
clusterDynamicInformerFactory: clusterDynamicInformerFactory,
namespaced: namespaced,
kubeInformerFactory: kubeInformerFactory,
jobInformerFactory: jobInformerFactory,
onlyAnalysisMode: true,
gdsoumya marked this conversation as resolved.
Show resolved Hide resolved
}

_, err := rolloutsConfig.InitializeConfig(kubeclientset, defaults.DefaultRolloutsConfigMapName)
if err != nil {
log.Fatalf("Failed to init config: %v", err)
}

Check warning on line 239 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L187-L239

Added lines #L187 - L239 were not covered by tests

err = plugin.DownloadPlugins(plugin.FileDownloaderImpl{})
if err != nil {
log.Fatalf("Failed to download plugins: %v", err)
}

Check warning on line 244 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L241-L244

Added lines #L241 - L244 were not covered by tests

return cm

Check warning on line 246 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L246

Added line #L246 was not covered by tests
}

// NewManager returns a new manager to manage all the controllers
Expand Down Expand Up @@ -441,11 +522,13 @@
log.Info("Shutting down workers")
goPlugin.CleanupClients()

c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
c.analysisRunWorkqueue.ShutDownWithDrain()
if !c.onlyAnalysisMode {
c.serviceWorkqueue.ShutDownWithDrain()
c.ingressWorkqueue.ShutDownWithDrain()
c.rolloutWorkqueue.ShutDownWithDrain()
c.experimentWorkqueue.ShutDownWithDrain()
}

c.analysisRunWorkqueue.ShutDownWithDrain()

ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) // give max of 10 seconds for http servers to shut down
Expand All @@ -463,12 +546,6 @@
// Start the informer factories to begin populating the informer caches
log.Info("Starting Controllers")

c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")
}

// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
c.dynamicInformerFactory.Start(ctx.Done())
Expand All @@ -479,29 +556,50 @@

c.jobInformerFactory.Start(ctx.Done())

// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}
if c.onlyAnalysisMode {
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.analysisRunSynced, c.analysisTemplateSynced, c.jobSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}

Check warning on line 563 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L560-L563

Added lines #L560 - L563 were not covered by tests
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
}

Check warning on line 568 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L565-L568

Added lines #L565 - L568 were not covered by tests
}
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())

Check warning on line 570 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L570

Added line #L570 was not covered by tests
} else {

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
c.notificationConfigMapInformerFactory.Start(ctx.Done())
c.notificationSecretInformerFactory.Start(ctx.Done())
if ok := cache.WaitForCacheSync(ctx.Done(), c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for configmap/secret caches to sync, exiting")

Check warning on line 576 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L576

Added line #L576 was not covered by tests
}
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())
// Check if Istio installed on cluster before starting dynamicInformerFactory
if istioutil.DoesIstioExist(c.istioPrimaryDynamicClient, c.namespace) {
c.istioDynamicInformerFactory.Start(ctx.Done())
}

// Wait for the caches to be synced before starting workers
log.Info("Waiting for controller's informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.serviceSynced, c.ingressSynced, c.jobSynced, c.rolloutSynced, c.experimentSynced, c.analysisRunSynced, c.analysisTemplateSynced, c.replicasSetSynced, c.configMapSynced, c.secretSynced); !ok {
log.Fatalf("failed to wait for caches to sync, exiting")
}

Check warning on line 588 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L587-L588

Added lines #L587 - L588 were not covered by tests
// only wait for cluster scoped informers to sync if we are running in cluster-wide mode
if c.namespace == metav1.NamespaceAll {
if ok := cache.WaitForCacheSync(ctx.Done(), c.clusterAnalysisTemplateSynced); !ok {
log.Fatalf("failed to wait for cluster-scoped caches to sync, exiting")
}

Check warning on line 593 in controller/controller.go

View check run for this annotation

Codecov / codecov/patch

controller/controller.go#L592-L593

Added lines #L592 - L593 were not covered by tests
}

go wait.Until(func() { c.wg.Add(1); c.rolloutController.Run(ctx, rolloutThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.serviceController.Run(ctx, serviceThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.ingressController.Run(ctx, ingressThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.experimentController.Run(ctx, experimentThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.analysisController.Run(ctx, analysisThreadiness); c.wg.Done() }, time.Second, ctx.Done())
go wait.Until(func() { c.wg.Add(1); c.notificationsController.Run(rolloutThreadiness, ctx.Done()); c.wg.Done() }, time.Second, ctx.Done())

}
log.Info("Started controller")
}
8 changes: 6 additions & 2 deletions controller/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ func NewMetricsServer(cfg ServerConfig) *MetricsServer {

reg := prometheus.NewRegistry()

reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
if cfg.RolloutLister != nil {
reg.MustRegister(NewRolloutCollector(cfg.RolloutLister))
}
if cfg.ExperimentLister != nil {
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
}
reg.MustRegister(NewAnalysisRunCollector(cfg.AnalysisRunLister, cfg.AnalysisTemplateLister, cfg.ClusterAnalysisTemplateLister))
reg.MustRegister(NewExperimentCollector(cfg.ExperimentLister))
cfg.K8SRequestProvider.MustRegister(reg)
reg.MustRegister(MetricRolloutReconcile)
reg.MustRegister(MetricRolloutReconcileError)
Expand Down
22 changes: 12 additions & 10 deletions utils/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,22 @@
e.RolloutEventCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

apis, err := e.apiFactory.GetAPIsFromNamespace(namespace)
if err != nil {
logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err)
e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

for _, api := range apis {
err := e.sendNotifications(api, object, opts)
if e.apiFactory != nil {
apis, err := e.apiFactory.GetAPIsFromNamespace(namespace)
if err != nil {
logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err)
logCtx.Errorf("notifications failed to get apis for eventReason %s with error: %s", opts.EventReason, err)
e.NotificationFailedCounter.WithLabelValues(namespace, name, opts.EventType, opts.EventReason).Inc()
}

Check warning on line 216 in utils/record/record.go

View check run for this annotation

Codecov / codecov/patch

utils/record/record.go#L214-L216

Added lines #L214 - L216 were not covered by tests

for _, api := range apis {
err := e.sendNotifications(api, object, opts)
if err != nil {
logCtx.Errorf("Notifications failed to send for eventReason %s with error: %s", opts.EventReason, err)
}

Check warning on line 222 in utils/record/record.go

View check run for this annotation

Codecov / codecov/patch

utils/record/record.go#L221-L222

Added lines #L221 - L222 were not covered by tests
}
}
}

}
logFn := logCtx.Infof
if warn {
logFn = logCtx.Warnf
Expand Down
Loading