Skip to content

Commit

Permalink
feat(controllers): make resyncPeriod configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jian Zeng committed Nov 5, 2020
1 parent 72baf7d commit 42498b6
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 19 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
ROOT := github.com/caicloud/cyclone

# Target binaries. You can build multiple binaries for a single project.
TARGETS := server workflow/controller workflow/coordinator cicd/cd toolbox/fstream
IMAGES := server workflow/controller workflow/coordinator resolver/git resolver/svn resolver/image resolver/http watcher cicd/cd cicd/sonarqube toolbox
TARGETS ?= server workflow/controller workflow/coordinator cicd/cd toolbox/fstream
IMAGES ?= server workflow/controller workflow/coordinator resolver/git resolver/svn resolver/image resolver/http watcher cicd/cd cicd/sonarqube toolbox
BASE_IMAGES := base/alpine base/openjdk

# Container image prefix and suffix added to targets.
Expand Down Expand Up @@ -241,4 +241,4 @@ clean:
clean-generated:
-rm -rf ./pkg/k8s/informers
-rm -rf ./pkg/k8s/clientset
-rm -rf ./pkg/k8s/listers
-rm -rf ./pkg/k8s/listers
4 changes: 0 additions & 4 deletions pkg/workflow/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ package common

import (
"fmt"
"time"
)

// ContainerState represents container state.
type ContainerState string

const (
// ResyncPeriod defines resync period for controllers
ResyncPeriod = time.Minute * 3

// EnvStagePodName is an environment which represents pod name.
EnvStagePodName = "POD_NAME"
// EnvStageInfo is an environment which represents stage information.
Expand Down
13 changes: 12 additions & 1 deletion pkg/workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type WorkflowControllerConfig struct {
DindSettings DindSettings `json:"dind"`
// WorkersNumber defines workers number for various controller
WorkersNumber WorkersNumber `json:"workers_number"`
// ResyncPeriodSeconds defines resync period in seconds for controllers
ResyncPeriodSeconds time.Duration `json:"resync_period_seconds"`
}

// LoggingConfig configures logging
Expand Down Expand Up @@ -143,12 +145,13 @@ func LoadConfig(cm *corev1.ConfigMap) error {
return err
}

defaultValues(&Config)
if !validate(&Config) {
return fmt.Errorf("validate config failed")
}

defaultValues(&Config)
InitLogger(&Config.Logging)
log.Info("ResyncPeriod is %s", Config.ResyncPeriodSeconds*time.Second)
return nil
}

Expand All @@ -157,12 +160,20 @@ func validate(config *WorkflowControllerConfig) bool {
if config.ExecutionContext.PVC == "" {
log.Warn("PVC not configured, resources won't be shared among stages and artifacts unsupported.")
}
if config.ResyncPeriodSeconds < 0 {
log.Errorf("Invalid ResyncPeriodSeconds: %d", config.ResyncPeriodSeconds)
return false
}

return true
}

// defaultValues give the config some default value if they are not set.
func defaultValues(config *WorkflowControllerConfig) {
if config.ResyncPeriodSeconds == 0 {
config.ResyncPeriodSeconds = 180
log.Info("ResyncPeriodSeconds not configured, will use default value '180'")
}
if config.WorkersNumber.ExecutionCluster == 0 {
config.WorkersNumber.ExecutionCluster = 1
log.Info("WorkersNumber.ExecutionCluster not configured, will use default value '1'")
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/exucution_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package controllers

import (
"reflect"
"time"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/executioncluster"
)

Expand All @@ -17,7 +18,7 @@ func NewExecutionClusterController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
)

informer := factory.Cyclone().V1alpha1().ExecutionClusters().Informer()
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand All @@ -11,7 +12,7 @@ import (

"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/pod"
)

Expand All @@ -20,7 +21,7 @@ func NewPodController(clusterClient kubernetes.Interface, client clientset.Inter
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
clusterClient,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.CyclonePodSelector()
}),
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/workflow_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"reflect"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowtrigger"
)

Expand All @@ -20,7 +21,7 @@ func NewWorkflowTriggerController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.WorkflowTriggerSelector()
}),
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/workflowrun.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package controllers

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
handlers "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowrun"
)
Expand All @@ -18,7 +19,7 @@ func NewWorkflowRunController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.WorkflowRunSelector()
}),
Expand Down
2 changes: 1 addition & 1 deletion pkg/workflow/controller/handlers/workflowrun/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (h *Handler) Reconcile(obj interface{}) error {
attemptAction := h.ParallelismController.AttemptNew(originWfr.Namespace, originWfr.Spec.WorkflowRef.Name, originWfr.Name)
switch attemptAction {
case workflowrun.AttemptActionQueued:
log.WithField("wfr", originWfr.Name).Infof("Too many WorkflowRun are running, stay pending in queue, will retry in %s", common.ResyncPeriod.String())
log.WithField("wfr", originWfr.Name).Infof("Too many WorkflowRun are running, stay pending in queue, will retry in %d seconds", controller.Config.ResyncPeriodSeconds)
return fmt.Errorf("too many WorkflowRun are running")
case workflowrun.AttemptActionFailed:
if err := h.SetStatus(originWfr.Namespace, originWfr.Name, &v1alpha1.Status{
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/workflowrun/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
)

// LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to
Expand Down Expand Up @@ -102,7 +102,7 @@ func scanQueue(q *LimitedSortedQueue) {
// If the node's refresh time is old enough compared to the resync time
// (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun
// is actually removed from etcd somehow, so we will remove it also here.
if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) {
if h.next.refresh.Add(controller.Config.ResyncPeriodSeconds * time.Second * 2).Before(time.Now()) {
log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue")
h.next = h.next.next
q.size--
Expand Down

0 comments on commit 42498b6

Please sign in to comment.