Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Enhanced TTL controller scalability #4736

Merged
merged 4 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewRootCommand() *cobra.Command {
logLevel string // --loglevel
glogLevel int // --gloglevel
workflowWorkers int // --workflow-workers
workflowTTLWorkers int // --workflow-ttl-workers
podWorkers int // --pod-workers
burst int
qps float32
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewRootCommand() *cobra.Command {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go wfController.Run(ctx, workflowWorkers, podWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podWorkers)

// Wait forever
select {}
Expand All @@ -100,6 +101,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().StringVar(&logLevel, "loglevel", "info", "Set the logging level. One of: debug|info|warn|error")
command.Flags().IntVar(&glogLevel, "gloglevel", 0, "Set the glog logging level")
command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers")
command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. This was my proposal during the prod incident. But We don't want to make too many changes

command.Flags().IntVar(&podWorkers, "pod-workers", 32, "Number of pod workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
Expand Down
3 changes: 1 addition & 2 deletions docs/scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ You cannot horizontally scale the controller.

You can scale the controller vertically:

- If you have workflows with many steps, increase `--pod-workers`.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recommend changing --pod-workers anymore. It has no impact in any testing I've done.

- If you have many workflows, increase `--workflow-workers`.
- If you have many workflows, increase `--workflow-workers` and `--workflow-ttl-workers`.
- Increase both `--qps` and `--burst`.

You will need to increase the controller's memory and CPU.
Expand Down
8 changes: 8 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -55,7 +56,14 @@ func (t *Then) expectWorkflow(workflowName string, block func(t *testing.T, meta
t.t.FailNow()
}
return t
}

func (t *Then) ExpectWorkflowDeleted() *Then {
_, err := t.client.Get(t.wf.Name, metav1.GetOptions{})
if err == nil || !apierr.IsNotFound(err) {
t.t.Fatalf("expected workflow to be deleted: %v", err)
}
return t
}

func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *Then {
Expand Down
24 changes: 24 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ func (s *FunctionalSuite) TestDeletingRunningPodWithOrErrorRetryPolicy() {
})
}

func (s *FunctionalSuite) TestWorkflowTTL() {
s.Given().
Workflow(`
metadata:
generateName: workflow-ttl-
labels:
argo-e2e: true
spec:
ttlStrategy:
secondsAfterCompletion: 0
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(3 * time.Second). // enough time for TTL controller to delete the workflow
Then().
ExpectWorkflowDeleted()
}

// in this test we create a poi quota, and then we create a workflow that needs one more pod than the quota allows
// because we run them in parallel, the first node will run to completion, and then the second one
func (s *FunctionalSuite) TestResourceQuota() {
Expand Down
8 changes: 4 additions & 4 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler {
}

// RunTTLController runs the workflow TTL controller
func (wfc *WorkflowController) runTTLController(ctx context.Context) {
func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) {
ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer)
err := ttlCtrl.Run(ctx.Done())
err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers)
if err != nil {
panic(err)
}
Expand All @@ -178,7 +178,7 @@ var indexers = cache.Indexers{
}

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers int) {
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podWorkers int) {
defer wfc.wfQueue.ShutDown()
defer wfc.podQueue.ShutDown()

Expand Down Expand Up @@ -237,7 +237,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())

go wfc.runTTLController(ctx)
go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wfc.metrics.RunServer(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
Expand Down
2 changes: 1 addition & 1 deletion workflow/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestMetricGC(t *testing.T) {
go m.garbageCollector(ctx)

// Ensure we get at least one TTL run
time.Sleep(1*time.Second + time.Millisecond)
time.Sleep(1*time.Second + 100*time.Millisecond)

assert.Len(t, m.customMetrics, 0)
}
Expand Down
35 changes: 24 additions & 11 deletions workflow/ttlcontroller/ttlcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

const (
workflowTTLResyncPeriod = 20 * time.Minute
// 1s is usually enough time for the informer to get synced and be up-to-date
enoughTimeForInformerSync = time.Second
)

type ConfigSupplier func() *config.Config
Expand All @@ -49,28 +51,29 @@ func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedInd
wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
un, ok := obj.(*unstructured.Unstructured)
return ok && un.GetLabels()[common.LabelKeyCompleted] == "true" && un.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Pending"
return ok && un.GetDeletionTimestamp() == nil && un.GetLabels()[common.LabelKeyCompleted] == "true" && un.GetLabels()[common.LabelKeyWorkflowArchivingStatus] != "Pending"
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueWF,
UpdateFunc: func(old, new interface{}) {
controller.enqueueWF(new)
},
DeleteFunc: controller.enqueueWF,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is invoked after the WF is deleted - so it is not needed - removing reduces the number of API requests by 1

},
})
return controller
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
func (c *Controller) Run(stopCh <-chan struct{}, workflowTTLWorkers int) error {
defer runtimeutil.HandleCrash()
defer c.workqueue.ShutDown()
log.Infof("Starting workflow TTL controller (resync %v)", c.resyncPeriod)
log.Infof("Starting workflow TTL controller (resync %v, workflowTTLWorkers %d)", c.resyncPeriod, workflowTTLWorkers)
go c.wfInformer.Run(stopCh)
if ok := cache.WaitForCacheSync(stopCh, c.wfInformer.HasSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
go wait.Until(c.runWorker, time.Second, stopCh)
for i := 0; i < workflowTTLWorkers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
log.Info("Started workflow TTL worker")
<-stopCh
log.Info("Shutting workflow TTL worker")
Expand Down Expand Up @@ -129,9 +132,6 @@ func (c *Controller) enqueueWF(obj interface{}) {
log.Warnf("'%v' is not an unstructured", obj)
return
}
if un.GetDeletionTimestamp() != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant now it is checked by FilterFunc on line 52

return
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Failed to unmarshal workflow %v object: %v", obj, err)
Expand All @@ -152,7 +152,12 @@ func (c *Controller) enqueueWF(obj interface{}) {
runtimeutil.HandleError(err)
return
}
//c.workqueue.Add(key)
// if we try and delete in the next second, it is almost certain that the informer is out of sync. Because we
// double-check that sees if the workflow in the informer is already deleted and we'll make 2 API requests when
// one is enough.
if addAfter < enoughTimeForInformerSync {
addAfter = enoughTimeForInformerSync
}
log.Infof("Queueing workflow %s/%s for delete in %v", wf.Namespace, wf.Name, addAfter)
c.workqueue.AddAfter(key, addAfter)
}
Expand All @@ -177,6 +182,9 @@ func (c *Controller) deleteWorkflow(key string) error {
log.Warnf("Key '%s' in index is not an unstructured", key)
return nil
}
if un.GetDeletionTimestamp() != nil {
return nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
Expand All @@ -187,9 +195,14 @@ func (c *Controller) deleteWorkflow(key string) error {

err = c.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Delete(wf.Name, &metav1.DeleteOptions{PropagationPolicy: commonutil.GetDeletePropagation()})
if err != nil {
return err
if apierr.IsNotFound(err) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not seen this error apart from initial testing - prevented by line 155 I think, but a good bonus check

log.Infof("workflow already deleted '%s'", key)
} else {
return err
}
} else {
log.Infof("Successfully deleted '%s'", key)
}
log.Infof("Successfully deleted '%s'", key)
}
return nil
}
Expand Down
40 changes: 22 additions & 18 deletions workflow/ttlcontroller/ttlcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ func newTTLController() *Controller {
}
}

func enqueueWF(controller *Controller, un *unstructured.Unstructured) {
controller.enqueueWF(un)
time.Sleep(100*time.Millisecond + enoughTimeForInformerSync)
}

func TestEnqueueWF(t *testing.T) {
var err error
var un *unstructured.Unstructured
Expand All @@ -363,23 +368,23 @@ func TestEnqueueWF(t *testing.T) {
wf := test.LoadWorkflowFromBytes([]byte(completedWf))
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

// Veirfy we do not enqueue if workflow finished is not exceed the TTL
wf.Spec.TTLSecondsAfterFinished = &ten
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-5 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

// Verify we enqueue when ttl is expired
wf.Spec.TTLSecondsAfterFinished = &ten
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-11 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 1, controller.workqueue.Len())
}

Expand All @@ -396,15 +401,15 @@ func TestTTLStrategySucceded(t *testing.T) {
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-5 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

wf1 := test.LoadWorkflowFromBytes([]byte(succeededWf))
wf1.Spec.TTLStrategy = &wfv1.TTLStrategy{SecondsAfterSuccess: &ten}
wf1.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-11 * time.Second)}
un, err = util.ToUnstructured(wf1)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 1, controller.workqueue.Len())

wf2 := test.LoadWorkflowFromBytes([]byte(wftRefWithTTLinWFT))
Expand All @@ -413,7 +418,7 @@ func TestTTLStrategySucceded(t *testing.T) {
assert.NoError(t, err)
_, err = controller.wfclientset.ArgoprojV1alpha1().Workflows("default").Create(wf2)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
controller.processNextWorkItem()
assert.Equal(t, 1, controller.workqueue.Len())

Expand All @@ -423,7 +428,7 @@ func TestTTLStrategySucceded(t *testing.T) {
assert.NoError(t, err)
_, err = controller.wfclientset.ArgoprojV1alpha1().Workflows("default").Create(wf3)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
controller.processNextWorkItem()
assert.Equal(t, 1, controller.workqueue.Len())

Expand All @@ -442,19 +447,18 @@ func TestTTLStrategyFailed(t *testing.T) {
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-5 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

wf1 := test.LoadWorkflowFromBytes([]byte(failedWf))
wf1.Spec.TTLStrategy = &wfv1.TTLStrategy{SecondsAfterFailure: &ten}
wf1.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-11 * time.Second)}
un, err = util.ToUnstructured(wf1)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 1, controller.workqueue.Len())

}

func TestNoTTLStrategyFailed(t *testing.T) {
var err error
var un *unstructured.Unstructured
Expand All @@ -464,14 +468,14 @@ func TestNoTTLStrategyFailed(t *testing.T) {
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-5 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

wf1 := test.LoadWorkflowFromBytes([]byte(failedWf))
wf1.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-11 * time.Second)}
un, err = util.ToUnstructured(wf1)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

}
Expand All @@ -489,7 +493,7 @@ func TestNoTTLStrategyFailedButTTLSecondsAfterFinished(t *testing.T) {
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-5 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller.workqueue.Len())

wf1 := test.LoadWorkflowFromBytes([]byte(failedWf))
Expand All @@ -499,7 +503,7 @@ func TestNoTTLStrategyFailedButTTLSecondsAfterFinished(t *testing.T) {
wf1.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-11 * time.Second)}
un, err = util.ToUnstructured(wf1)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 1, controller.workqueue.Len())
}

Expand All @@ -516,7 +520,7 @@ func TestTTLStrategyFromUnstructured(t *testing.T) {
wf.Status.FinishedAt = metav1.Time{Time: controller.clock.Now().Add(-6 * time.Second)}
un, err = util.ToUnstructured(wf)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 1, controller.workqueue.Len())

controller1 := newTTLController()
Expand All @@ -527,7 +531,7 @@ func TestTTLStrategyFromUnstructured(t *testing.T) {
wf1.Status.FinishedAt = metav1.Time{Time: controller1.clock.Now().Add(-6 * time.Second)}
un, err = util.ToUnstructured(wf1)
assert.NoError(t, err)
controller1.enqueueWF(un)
enqueueWF(controller1, un)
assert.Equal(t, 1, controller1.workqueue.Len())

controller2 := newTTLController()
Expand All @@ -538,7 +542,7 @@ func TestTTLStrategyFromUnstructured(t *testing.T) {
wf2.Status.FinishedAt = metav1.Time{Time: controller2.clock.Now().Add(-6 * time.Second)}
un, err = util.ToUnstructured(wf2)
assert.NoError(t, err)
controller2.enqueueWF(un)
enqueueWF(controller2, un)
assert.Equal(t, 1, controller2.workqueue.Len())

controller3 := newTTLController()
Expand All @@ -549,7 +553,7 @@ func TestTTLStrategyFromUnstructured(t *testing.T) {
un, err = util.ToUnstructured(wf3)
t.Log(wf3.Spec.TTLStrategy)
assert.NoError(t, err)
controller.enqueueWF(un)
enqueueWF(controller, un)
assert.Equal(t, 0, controller3.workqueue.Len())
}

Expand Down