Skip to content

Commit

Permalink
feat: Expose the Cron workflow workers as argument (#11457)
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <sarabala1979@gmail.com>
  • Loading branch information
sarabala1979 committed Jul 28, 2023
1 parent eefa6ec commit be0bdf9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
6 changes: 4 additions & 2 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func NewRootCommand() *cobra.Command {
workflowWorkers int // --workflow-workers
workflowTTLWorkers int // --workflow-ttl-workers
podCleanupWorkers int // --pod-cleanup-workers
cronWorkflowWorkers int // --cron-workflow-workers
burst int
qps float32
namespaced bool // --namespaced
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewRootCommand() *cobra.Command {
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewRootCommand() *cobra.Command {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
dummyCancel()
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers)
go wfController.RunMetricsServer(ctx, false)
},
OnStoppedLeading: func() {
Expand Down Expand Up @@ -183,6 +184,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers")
command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers")
command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers")
command.Flags().IntVar(&cronWorkflowWorkers, "cron-workflow-workers", 8, "Number of cron workflow workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode")
Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW
}
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
func (wfc *WorkflowController) runCronController(ctx context.Context, cronWorkflowWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, cronWorkflowWorkers)
cronController.Run(ctx)
}

Expand All @@ -235,7 +235,7 @@ var indexers = cache.Indexers{
}

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers int) {
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

// init DB after leader election (if enabled)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.archivedWorkflowGarbageCollector(ctx.Done())

go wfc.runGCcontroller(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wfc.runCronController(ctx, cronWorkflowWorkers)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

Expand Down
7 changes: 4 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ type Controller struct {
dynamicInterface dynamic.Interface
metrics *metrics.Metrics
eventRecorderManager events.EventRecorderManager
cronWorkflowWorkers int
}

const (
cronWorkflowResyncPeriod = 20 * time.Minute
cronWorkflowWorkers = 8
)

var (
Expand All @@ -68,7 +68,7 @@ func init() {
log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config")
}

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager, cronWorkflowWorkers int) *Controller {
return &Controller{
wfClientset: wfclientset,
namespace: namespace,
Expand All @@ -80,6 +80,7 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
eventRecorderManager: eventRecorderManager,
cronWorkflowWorkers: cronWorkflowWorkers,
}
}

Expand Down Expand Up @@ -110,7 +111,7 @@ func (cc *Controller) Run(ctx context.Context) {

go wait.UntilWithContext(ctx, cc.syncAll, cronSyncPeriod)

for i := 0; i < cronWorkflowWorkers; i++ {
for i := 0; i < cc.cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
}

Expand Down

0 comments on commit be0bdf9

Please sign in to comment.