Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(controller): Enhanced pod clean-up scalability #4728

Merged
merged 14 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewRootCommand() *cobra.Command {
workflowWorkers int // --workflow-workers
workflowTTLWorkers int // --workflow-ttl-workers
podWorkers int // --pod-workers
podCleanupWorkers int // --pod-cleanup-workers
burst int
qps float32
namespaced bool // --namespaced
Expand Down Expand Up @@ -85,7 +86,7 @@ func NewRootCommand() *cobra.Command {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podWorkers)
go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podWorkers, podCleanupWorkers)

// Wait forever
select {}
Expand All @@ -103,6 +104,7 @@ func NewRootCommand() *cobra.Command {
command.Flags().IntVar(&workflowWorkers, "workflow-workers", 32, "Number of workflow workers")
command.Flags().IntVar(&workflowTTLWorkers, "workflow-ttl-workers", 4, "Number of workflow TTL workers")
command.Flags().IntVar(&podWorkers, "pod-workers", 32, "Number of pod workers")
command.Flags().IntVar(&podCleanupWorkers, "pod-cleanup-workers", 4, "Number of pod cleanup workers")
command.Flags().IntVar(&burst, "burst", 30, "Maximum burst for throttle.")
command.Flags().Float32Var(&qps, "qps", 20.0, "Queries per second")
command.Flags().BoolVar(&namespaced, "namespaced", false, "run workflow-controller as namespaced mode")
Expand Down
27 changes: 19 additions & 8 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,21 +1176,32 @@ var _ ArgumentsProvider = &Arguments{}
type Nodes map[string]NodeStatus

func (n Nodes) FindByDisplayName(name string) *NodeStatus {
return n.Find(NodeWithDisplayName(name))
}

func (in Nodes) Any(f func(NodeStatus) bool) bool {
return in.Find(f) != nil
}

func (n Nodes) Find(f func(NodeStatus) bool) *NodeStatus {
for _, i := range n {
if i.DisplayName == name {
if f(i) {
return &i
}
}
return nil
}

func (in Nodes) Any(f func(node NodeStatus) bool) bool {
for _, i := range in {
if f(i) {
return true
}
}
return false
func NodeWithDisplayName(name string) func(n NodeStatus) bool {
return func(n NodeStatus) bool { return n.DisplayName == name }
}

func FailedPodNode(n NodeStatus) bool {
return n.Type == NodeTypePod && n.Phase == NodeFailed
}

func SucceededPodNode(n NodeStatus) bool {
return n.Type == NodeTypePod && n.Phase == NodeSucceeded
}

// UserContainer is a container specified by a user.
Expand Down
26 changes: 26 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,32 @@ func (t *Then) ExpectWorkflowDeleted() *Then {
return t
}

// Check on a specific node in the workflow.
// If no node matches the selector, then the NodeStatus and Pod will be nil.
// If the pod does not exist (e.g. because it was deleted) then the Pod will be nil too.
func (t *Then) ExpectWorkflowNode(selector func(status wfv1.NodeStatus) bool, f func(t *testing.T, status *wfv1.NodeStatus, pod *apiv1.Pod)) *Then {
return t.expectWorkflow(t.wf.Name, func(tt *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
n := status.Nodes.Find(selector)
var p *apiv1.Pod
if n != nil {
println("Found node", "id="+n.ID, "type="+n.Type)
if n.Type == wfv1.NodeTypePod {
var err error
p, err = t.kubeClient.CoreV1().Pods(t.wf.Namespace).Get(n.ID, metav1.GetOptions{})
if err != nil {
if !apierr.IsNotFound(err) {
t.t.Fatal(err)
}
p = nil // i did not expect to need to nil the pod, but here we are
}
}
} else {
println("Did not find node")
}
f(tt, n, p)
})
}

func (t *Then) ExpectCron(block func(t *testing.T, cronWf *wfv1.CronWorkflow)) *Then {
t.t.Helper()
if t.cronWf == nil {
Expand Down
195 changes: 195 additions & 0 deletions test/e2e/pod_cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// +build e2e

package e2e

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/e2e/fixtures"
"github.com/argoproj/argo/workflow/common"
)

type PodCleanupSuite struct {
fixtures.E2ESuite
}

const enoughTimeForPodCleanup = 5 * time.Second

func (s *PodCleanupSuite) TestNone() {
s.Given().
Workflow(`
metadata:
generateName: test-pod-cleanup-
labels:
argo-e2e: true
spec:
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(enoughTimeForPodCleanup).
Then().
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) && assert.NotNil(t, p) {
assert.Equal(t, "true", p.Labels[common.LabelKeyCompleted])
}
})
}

func (s *PodCleanupSuite) TestOnPodCompletion() {
s.Given().
Workflow(`
metadata:
generateName: test-pod-cleanup-on-pod-success-
labels:
argo-e2e: true
spec:
podGC:
strategy: OnPodCompletion
entrypoint: main
templates:
- name: main
steps:
- - name: success
template: success
- name: failure
template: failure
- name: success
container:
image: argoproj/argosay:v2
- name: failure
container:
image: argoproj/argosay:v2
args: [exit, 1]
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(enoughTimeForPodCleanup).
Then().
ExpectWorkflowNode(wfv1.FailedPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.Nil(t, p, "failed pod is deleted")
}
}).
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.Nil(t, p, "successful pod is deleted")
}
})
}

func (s *PodCleanupSuite) TestOnPodSuccess() {
s.Given().
Workflow(`
metadata:
generateName: test-pod-cleanup-on-pod-success-
labels:
argo-e2e: true
spec:
podGC:
strategy: OnPodSuccess
entrypoint: main
templates:
- name: main
steps:
- - name: success
template: success
- name: failure
template: failure
- name: success
container:
image: argoproj/argosay:v2
- name: failure
container:
image: argoproj/argosay:v2
args: [exit, 1]
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(enoughTimeForPodCleanup).
Then().
ExpectWorkflowNode(wfv1.FailedPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.NotNil(t, p, "failed pod is NOT deleted")
}
}).
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.Nil(t, p, "successful pod is deleted")
}
})
}

func (s *PodCleanupSuite) TestOnWorkflowCompletion() {
s.Given().
Workflow(`
metadata:
generateName: test-pod-cleanup-on-workflow-completion-
labels:
argo-e2e: true
spec:
podGC:
strategy: OnWorkflowCompletion
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
args: [exit, 1]
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(enoughTimeForPodCleanup).
Then().
ExpectWorkflowNode(wfv1.FailedPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.Nil(t, p, "failed pod is deleted")
}
})
}

func (s *PodCleanupSuite) TestOnWorkflowSuccess() {
s.Given().
Workflow(`
metadata:
generateName: test-pod-cleanup-on-workflow-success-
labels:
argo-e2e: true
spec:
podGC:
strategy: OnWorkflowSuccess
entrypoint: main
templates:
- name: main
container:
image: argoproj/argosay:v2
`).
When().
SubmitWorkflow().
WaitForWorkflow().
Wait(enoughTimeForPodCleanup).
Then().
ExpectWorkflowNode(wfv1.SucceededPodNode, func(t *testing.T, n *wfv1.NodeStatus, p *corev1.Pod) {
if assert.NotNil(t, n) {
assert.Nil(t, p, "successful pod is deleted")
}
})
}

func TestPodCleanupSuite(t *testing.T) {
suite.Run(t, new(PodCleanupSuite))
}
2 changes: 1 addition & 1 deletion test/e2e/testdata/sleepy-retry-on-error-workflow.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
kind: Workflow
apiVersion: argoproj.io/v1alpha1
metadata:
name: sleepy-retry-on-error
generateName: sleepy-retry-on-error-
labels:
argo-e2e: "true"
spec:
Expand Down