Skip to content

Commit

Permalink
feat(controllers): make resyncPeriod configurable (#1520)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jian Zeng committed Nov 5, 2020
1 parent 4bb132f commit 56c9019
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 14 deletions.
4 changes: 0 additions & 4 deletions pkg/workflow/common/const.go
Expand Up @@ -2,16 +2,12 @@ package common

import (
"fmt"
"time"
)

// ContainerState represents container state.
type ContainerState string

const (
// ResyncPeriod defines resync period for controllers
ResyncPeriod = time.Minute * 5

// EnvStagePodName is an environment which represents pod name.
EnvStagePodName = "POD_NAME"
// EnvStageInfo is an environment which represents stage information.
Expand Down
10 changes: 10 additions & 0 deletions pkg/workflow/controller/config.go
Expand Up @@ -50,6 +50,8 @@ type WorkflowControllerConfig struct {
NotificationURL string `json:"notification_url"`
// DindSettings is settings for Docker in Docker
DindSettings DindSettings `json:"dind"`
// ResyncPeriodSeconds defines resync period in seconds for controllers
ResyncPeriodSeconds time.Duration `json:"resync_period_seconds"`
}

// LoggingConfig configures logging
Expand Down Expand Up @@ -113,16 +115,24 @@ func LoadConfig(cm *corev1.ConfigMap) error {
return err
}

if Config.ResyncPeriodSeconds == 0 {
Config.ResyncPeriodSeconds = 180 // 3min
}
if !validate(&Config) {
return fmt.Errorf("validate config failed")
}

InitLogger(&Config.Logging)
log.Infof("ResyncPeriod is %s", Config.ResyncPeriodSeconds*time.Second)
return nil
}

// validate validates some required configurations.
func validate(config *WorkflowControllerConfig) bool {
if config.ResyncPeriodSeconds < 0 {
log.Errorf("Invalid ResyncPeriodSeconds: %d", config.ResyncPeriodSeconds)
return false
}
if config.ExecutionContext.PVC == "" {
log.Warn("PVC not configured, resources won't be shared among stages and artifacts unsupported.")
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/exucution_cluster.go
Expand Up @@ -2,13 +2,14 @@ package controllers

import (
"reflect"
"time"

"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/executioncluster"
)

Expand All @@ -17,7 +18,7 @@ func NewExecutionClusterController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
)

informer := factory.Cyclone().V1alpha1().ExecutionClusters().Informer()
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/pod.go
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand All @@ -11,7 +12,7 @@ import (

"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/pod"
)

Expand All @@ -20,7 +21,7 @@ func NewPodController(clusterClient kubernetes.Interface, client clientset.Inter
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
clusterClient,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.CyclonePodSelector()
}),
Expand Down
5 changes: 3 additions & 2 deletions pkg/workflow/controller/controllers/workflow_trigger.go
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"reflect"
"time"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
"github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowtrigger"
)

Expand All @@ -20,7 +21,7 @@ func NewWorkflowTriggerController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.WorkflowTriggerSelector()
}),
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/controller/controllers/workflowrun.go
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"reflect"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -10,7 +11,6 @@ import (
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/k8s/informers"
"github.com/caicloud/cyclone/pkg/meta"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
handlers "github.com/caicloud/cyclone/pkg/workflow/controller/handlers/workflowrun"
"github.com/caicloud/cyclone/pkg/workflow/workflowrun"
Expand All @@ -21,7 +21,7 @@ func NewWorkflowRunController(client clientset.Interface) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
factory := informers.NewSharedInformerFactoryWithOptions(
client,
common.ResyncPeriod,
controller.Config.ResyncPeriodSeconds*time.Second,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = meta.WorkflowRunSelector()
}),
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/workflowrun/limits.go
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/workflow/common"
"github.com/caicloud/cyclone/pkg/workflow/controller"
)

// LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to
Expand Down Expand Up @@ -102,7 +102,7 @@ func scanQueue(q *LimitedSortedQueue) {
// If the node's refresh time is old enough compared to the resync time
// (5 minutes by default) of WorkflowRun Controller, it means the WorkflowRun
// is actually removed from etcd somehow, so we will remove it also here.
if h.next.refresh.Add(common.ResyncPeriod * 2).Before(time.Now()) {
if h.next.refresh.Add(controller.Config.ResyncPeriodSeconds * time.Second * 2).Before(time.Now()) {
log.WithField("wfr", h.next.wfr).Info("remove wfr with outdated refresh time from queue")
h.next = h.next.next
q.size--
Expand Down

0 comments on commit 56c9019

Please sign in to comment.