Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: nodeAntiAffinity is not working as expected when boundaryID is empty. Fixes: #9193 #12701

Merged
merged 9 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ var (
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning
}), "to have running pod"
}
ToHaveFailedPod Condition = func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed
}), "to have failed pod"
}
)

// `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving.
Expand Down
39 changes: 39 additions & 0 deletions test/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ spec:
})
}

func (s *RetryTestSuite) TestRetryNodeAntiAffinity() {
s.Given().
Workflow(`
metadata:
name: test-nodeantiaffinity-strategy
spec:
entrypoint: main
templates:
- name: main
retryStrategy:
limit: '1'
retryPolicy: "Always"
affinity:
nodeAntiAffinity: {}
container:
name: main
image: 'argoproj/argosay:v2'
args: [ exit, "1" ]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToHaveFailedPod).
Wait(5 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
if status.Phase == wfv1.WorkflowFailed {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
}
if status.Phase == wfv1.WorkflowRunning {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.Contains(t, nodeStatusRetry.Message, "1 node(s) didn't match Pod's node affinity/selector")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
shuangkun marked this conversation as resolved.
Show resolved Hide resolved
}
})
}

func TestRetrySuite(t *testing.T) {
suite.Run(t, new(RetryTestSuite))
}
101 changes: 101 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7575,6 +7575,107 @@ func TestRetryOnDiffHost(t *testing.T) {
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)
}

var nodeAntiAffinityWorkflow = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: retry-fail
spec:
entrypoint: retry-fail
templates:
- name: retry-fail
retryStrategy:
limit: 2
retryPolicy: "Always"
affinity:
nodeAntiAffinity: {}
script:
image: python:alpine3.6
command: [python]
source: |
exit(1)
`

func TestRetryOnNodeAntiAffinity(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(nodeAntiAffinityWorkflow)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

pods, err := listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 1, len(pods.Items))

// First retry
pod := pods.Items[0]
pod.Spec.NodeName = "node0"
_, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &pod, metav1.UpdateOptions{})
assert.NoError(t, err)
makePodsPhase(ctx, woc, apiv1.PodFailed)
woc.operate(ctx)

node := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(0)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
assert.Equal(t, "node0", node.HostNodeName)
}

pods, err = listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))

var podRetry1 apiv1.Pod
for _, p := range pods.Items {
if p.Name != pod.GetName() {
podRetry1 = p
}
}

hostSelector := "kubernetes.io/hostname"
targetNodeSelectorRequirement := podRetry1.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0]
sourceNodeSelectorRequirement := apiv1.NodeSelectorRequirement{
Key: hostSelector,
Operator: apiv1.NodeSelectorOpNotIn,
Values: []string{node.HostNodeName},
}
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)

// Second retry
podRetry1.Spec.NodeName = "node1"
_, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &podRetry1, metav1.UpdateOptions{})
assert.NoError(t, err)
makePodsPhase(ctx, woc, apiv1.PodFailed)
woc.operate(ctx)

node1 := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(1)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node1.Phase)
assert.Equal(t, "node1", node1.HostNodeName)
}

pods, err = listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 3, len(pods.Items))

var podRetry2 apiv1.Pod
for _, p := range pods.Items {
if p.Name != pod.GetName() && p.Name != podRetry1.GetName() {
podRetry2 = p
}
}

targetNodeSelectorRequirement = podRetry2.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0]
sourceNodeSelectorRequirement = apiv1.NodeSelectorRequirement{
Key: hostSelector,
Operator: apiv1.NodeSelectorOpNotIn,
Values: []string{node1.HostNodeName, node.HostNodeName},
}
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)
}

var noPodsWhenShutdown = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
23 changes: 9 additions & 14 deletions workflow/controller/retry_tweak.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@ type RetryTweak = func(retryStrategy wfv1.RetryStrategy, nodes wfv1.Nodes, pod *
func FindRetryNode(nodes wfv1.Nodes, nodeID string) *wfv1.NodeStatus {
boundaryID := nodes[nodeID].BoundaryID
boundaryNode := nodes[boundaryID]
if boundaryNode.TemplateName != "" {
templateName := boundaryNode.TemplateName
for _, node := range nodes {
if node.Type == wfv1.NodeTypeRetry && node.TemplateName == templateName {
return &node
}
for _, node := range nodes {
if node.Type != wfv1.NodeTypeRetry {
continue
}
}
if boundaryNode.TemplateRef != nil {
templateRef := boundaryNode.TemplateRef
for _, node := range nodes {
if node.Type == wfv1.NodeTypeRetry && node.TemplateRef != nil && node.TemplateRef.Name == templateRef.Name && node.TemplateRef.Template == templateRef.Template {
return &node
}
if boundaryID == "" && node.HasChild(nodeID) {
return &node
} else if boundaryNode.TemplateName != "" && node.TemplateName == boundaryNode.TemplateName {
return &node
} else if boundaryNode.TemplateRef != nil && node.TemplateRef != nil && node.TemplateRef.Name == boundaryNode.TemplateRef.Name && node.TemplateRef.Template == boundaryNode.TemplateRef.Template {
return &node
}
}

return nil
}

Expand Down
Loading