From 18e65f677c183fbe4beb406557b217727ec7d5e2 Mon Sep 17 00:00:00 2001 From: Jian Zeng Date: Thu, 5 Nov 2020 17:53:46 +0800 Subject: [PATCH] feat(controllers): make resyncPeriod configurable --- pkg/workflow/common/const.go | 4 ---- pkg/workflow/controller/config.go | 10 ++++++++++ .../controller/controllers/exucution_cluster.go | 5 +++-- pkg/workflow/controller/controllers/pod.go | 5 +++-- .../controller/controllers/workflow_trigger.go | 5 +++-- pkg/workflow/controller/controllers/workflowrun.go | 4 ++-- pkg/workflow/workflowrun/limits.go | 4 ++-- 7 files changed, 23 insertions(+), 14 deletions(-) diff --git a/pkg/workflow/common/const.go b/pkg/workflow/common/const.go index ab43c3039..b4deced1d 100644 --- a/pkg/workflow/common/const.go +++ b/pkg/workflow/common/const.go @@ -2,16 +2,12 @@ package common import ( "fmt" - "time" ) // ContainerState represents container state. type ContainerState string const ( - // ResyncPeriod defines resync period for controllers - ResyncPeriod = time.Minute * 5 - // EnvStagePodName is an environment which represents pod name. EnvStagePodName = "POD_NAME" // EnvStageInfo is an environment which represents stage information. diff --git a/pkg/workflow/controller/config.go b/pkg/workflow/controller/config.go index daaed21d6..e6a836d46 100644 --- a/pkg/workflow/controller/config.go +++ b/pkg/workflow/controller/config.go @@ -50,6 +50,8 @@ type WorkflowControllerConfig struct { NotificationURL string `json:"notification_url"` // DindSettings is settings for Docker in Docker DindSettings DindSettings `json:"dind"` + // ResyncPeriodSeconds defines resync period in seconds for controllers + ResyncPeriodSeconds time.Duration `json:"resync_period_seconds"` } // LoggingConfig configures logging @@ -113,16 +115,24 @@ func LoadConfig(cm *corev1.ConfigMap) error { return err } + if Config.ResyncPeriodSeconds == 0 { + Config.ResyncPeriodSeconds = 180 // 3min + } if !validate(&Config) { return fmt.Errorf("validate config failed") } InitLogger(&Config.Logging) + log.Infof("ResyncPeriod is %s", Config.ResyncPeriodSeconds*time.Second) return nil } // validate validates some required configurations. func validate(config *WorkflowControllerConfig) bool { + if config.ResyncPeriodSeconds < 0 { + log.Errorf("Invalid ResyncPeriodSeconds: %d", config.ResyncPeriodSeconds) + return false + } if config.ExecutionContext.PVC == "" { log.Warn("PVC not configured, resources won't be shared among stages and artifacts unsupported.") } diff --git a/pkg/workflow/controller/controllers/exucution_cluster.go b/pkg/workflow/controller/controllers/exucution_cluster.go index 3e89ce8c8..443389136 100644 --- a/pkg/workflow/controller/controllers/exucution_cluster.go +++ b/pkg/workflow/controller/controllers/exucution_cluster.go @@ -2,13 +2,14 @@ package controllers import ( "reflect" + "time" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "github.com/caicloud/cyclone/pkg/k8s/clientset" "github.com/caicloud/cyclone/pkg/k8s/informers" - "github.com/caicloud/cyclone/pkg/workflow/common" + "github.com/caicloud/cyclone/pkg/workflow/controller" "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/executioncluster" ) @@ -17,7 +18,7 @@ func NewExecutionClusterController(client clientset.Interface) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) factory := informers.NewSharedInformerFactoryWithOptions( client, - common.ResyncPeriod, + controller.Config.ResyncPeriodSeconds*time.Second, ) informer := factory.Cyclone().V1alpha1().ExecutionClusters().Informer() diff --git a/pkg/workflow/controller/controllers/pod.go b/pkg/workflow/controller/controllers/pod.go index aeef2e635..a2ee63f48 100644 --- a/pkg/workflow/controller/controllers/pod.go +++ b/pkg/workflow/controller/controllers/pod.go @@ -2,6 +2,7 @@ package controllers import ( "reflect" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -11,7 +12,7 @@ import ( "github.com/caicloud/cyclone/pkg/k8s/clientset" "github.com/caicloud/cyclone/pkg/meta" - "github.com/caicloud/cyclone/pkg/workflow/common" + "github.com/caicloud/cyclone/pkg/workflow/controller" "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/pod" ) @@ -20,7 +21,7 @@ func NewPodController(clusterClient kubernetes.Interface, client clientset.Inter queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) factory := informers.NewSharedInformerFactoryWithOptions( clusterClient, - common.ResyncPeriod, + controller.Config.ResyncPeriodSeconds*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = meta.CyclonePodSelector() }), diff --git a/pkg/workflow/controller/controllers/workflow_trigger.go b/pkg/workflow/controller/controllers/workflow_trigger.go index d69a80d99..96ac3ed6e 100644 --- a/pkg/workflow/controller/controllers/workflow_trigger.go +++ b/pkg/workflow/controller/controllers/workflow_trigger.go @@ -2,6 +2,7 @@ package controllers import ( "reflect" + "time" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,7 +12,7 @@ import ( "github.com/caicloud/cyclone/pkg/k8s/clientset" "github.com/caicloud/cyclone/pkg/k8s/informers" "github.com/caicloud/cyclone/pkg/meta" - "github.com/caicloud/cyclone/pkg/workflow/common" + "github.com/caicloud/cyclone/pkg/workflow/controller" "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowtrigger" ) @@ -20,7 +21,7 @@ func NewWorkflowTriggerController(client clientset.Interface) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) factory := informers.NewSharedInformerFactoryWithOptions( client, - common.ResyncPeriod, + controller.Config.ResyncPeriodSeconds*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = meta.WorkflowTriggerSelector() }), diff --git a/pkg/workflow/controller/controllers/workflowrun.go b/pkg/workflow/controller/controllers/workflowrun.go index 39dc396c5..3876bf5b3 100644 --- a/pkg/workflow/controller/controllers/workflowrun.go +++ b/pkg/workflow/controller/controllers/workflowrun.go @@ -2,6 +2,7 @@ package controllers import ( "reflect" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" @@ -10,7 +11,6 @@ import ( "github.com/caicloud/cyclone/pkg/k8s/clientset" "github.com/caicloud/cyclone/pkg/k8s/informers" "github.com/caicloud/cyclone/pkg/meta" - "github.com/caicloud/cyclone/pkg/workflow/common" "github.com/caicloud/cyclone/pkg/workflow/controller" handlers "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowrun" "github.com/caicloud/cyclone/pkg/workflow/workflowrun" @@ -21,7 +21,7 @@ func NewWorkflowRunController(client clientset.Interface) *Controller { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) factory := informers.NewSharedInformerFactoryWithOptions( client, - common.ResyncPeriod, + controller.Config.ResyncPeriodSeconds*time.Second, informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = meta.WorkflowRunSelector() }), diff --git a/pkg/workflow/workflowrun/limits.go b/pkg/workflow/workflowrun/limits.go index ad3046a27..887828eae 100644 --- a/pkg/workflow/workflowrun/limits.go +++ b/pkg/workflow/workflowrun/limits.go @@ -11,7 +11,7 @@ import ( "github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1" "github.com/caicloud/cyclone/pkg/k8s/clientset" - "github.com/caicloud/cyclone/pkg/workflow/common" + "github.com/caicloud/cyclone/pkg/workflow/controller" ) // LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to @@ -102,7 +102,7 @@ func scanQueue(q *LimitedSortedQueue) { // If the node's refresh time is old enough compared to the resync time // (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun // is actually removed from etcd somehow, so we will remove it also here. - if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) { + if h.next.refresh.Add(controller.Config.ResyncPeriodSeconds * time.Second * 2).Before(time.Now()) { log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue") h.next = h.next.next q.size--