Skip to content

Commit

Permalink
fix: nodeAntiAffinity is not working as expected when boundaryID is e…
Browse files Browse the repository at this point in the history
…mpty. Fixes: #9193 (#12701)

Signed-off-by: shuangkun <tsk2013uestc@163.com>
(cherry picked from commit e490d48)
  • Loading branch information
shuangkun authored and agilgur5 committed May 27, 2024
1 parent 210f1f9 commit ca947f3
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 14 deletions.
5 changes: 5 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ var (
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning
}), "to have running pod"
}
ToHaveFailedPod Condition = func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.Nodes.Any(func(node wfv1.NodeStatus) bool {
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed
}), "to have failed pod"
}
)

// `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving.
Expand Down
39 changes: 39 additions & 0 deletions test/e2e/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ spec:
})
}

func (s *RetryTestSuite) TestRetryNodeAntiAffinity() {
s.Given().
Workflow(`
metadata:
name: test-nodeantiaffinity-strategy
spec:
entrypoint: main
templates:
- name: main
retryStrategy:
limit: '1'
retryPolicy: "Always"
affinity:
nodeAntiAffinity: {}
container:
name: main
image: 'argoproj/argosay:v2'
args: [ exit, "1" ]
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToHaveFailedPod).
Wait(5 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
if status.Phase == wfv1.WorkflowFailed {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
}
if status.Phase == wfv1.WorkflowRunning {
nodeStatus := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(0)")
nodeStatusRetry := status.Nodes.FindByDisplayName("test-nodeantiaffinity-strategy(1)")
assert.Contains(t, nodeStatusRetry.Message, "1 node(s) didn't match Pod's node affinity/selector")
assert.NotEqual(t, nodeStatus.HostNodeName, nodeStatusRetry.HostNodeName)
}
})
}

func TestRetrySuite(t *testing.T) {
suite.Run(t, new(RetryTestSuite))
}
101 changes: 101 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7380,6 +7380,107 @@ func TestRetryOnDiffHost(t *testing.T) {
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)
}

var nodeAntiAffinityWorkflow = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: retry-fail
spec:
entrypoint: retry-fail
templates:
- name: retry-fail
retryStrategy:
limit: 2
retryPolicy: "Always"
affinity:
nodeAntiAffinity: {}
script:
image: python:alpine3.6
command: [python]
source: |
exit(1)
`

func TestRetryOnNodeAntiAffinity(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(nodeAntiAffinityWorkflow)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

pods, err := listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 1, len(pods.Items))

// First retry
pod := pods.Items[0]
pod.Spec.NodeName = "node0"
_, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &pod, metav1.UpdateOptions{})
assert.NoError(t, err)
makePodsPhase(ctx, woc, apiv1.PodFailed)
woc.operate(ctx)

node := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(0)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
assert.Equal(t, "node0", node.HostNodeName)
}

pods, err = listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 2, len(pods.Items))

var podRetry1 apiv1.Pod
for _, p := range pods.Items {
if p.Name != pod.GetName() {
podRetry1 = p
}
}

hostSelector := "kubernetes.io/hostname"
targetNodeSelectorRequirement := podRetry1.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0]
sourceNodeSelectorRequirement := apiv1.NodeSelectorRequirement{
Key: hostSelector,
Operator: apiv1.NodeSelectorOpNotIn,
Values: []string{node.HostNodeName},
}
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)

// Second retry
podRetry1.Spec.NodeName = "node1"
_, err = controller.kubeclientset.CoreV1().Pods(woc.wf.GetNamespace()).Update(ctx, &podRetry1, metav1.UpdateOptions{})
assert.NoError(t, err)
makePodsPhase(ctx, woc, apiv1.PodFailed)
woc.operate(ctx)

node1 := woc.wf.Status.Nodes.FindByDisplayName("retry-fail(1)")
if assert.NotNil(t, node) {
assert.Equal(t, wfv1.NodeFailed, node1.Phase)
assert.Equal(t, "node1", node1.HostNodeName)
}

pods, err = listPods(woc)
assert.NoError(t, err)
assert.Equal(t, 3, len(pods.Items))

var podRetry2 apiv1.Pod
for _, p := range pods.Items {
if p.Name != pod.GetName() && p.Name != podRetry1.GetName() {
podRetry2 = p
}
}

targetNodeSelectorRequirement = podRetry2.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0]
sourceNodeSelectorRequirement = apiv1.NodeSelectorRequirement{
Key: hostSelector,
Operator: apiv1.NodeSelectorOpNotIn,
Values: []string{node1.HostNodeName, node.HostNodeName},
}
assert.Equal(t, sourceNodeSelectorRequirement, targetNodeSelectorRequirement)
}

var noPodsWhenShutdown = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
23 changes: 9 additions & 14 deletions workflow/controller/retry_tweak.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@ type RetryTweak = func(retryStrategy wfv1.RetryStrategy, nodes wfv1.Nodes, pod *
func FindRetryNode(nodes wfv1.Nodes, nodeID string) *wfv1.NodeStatus {
boundaryID := nodes[nodeID].BoundaryID
boundaryNode := nodes[boundaryID]
if boundaryNode.TemplateName != "" {
templateName := boundaryNode.TemplateName
for _, node := range nodes {
if node.Type == wfv1.NodeTypeRetry && node.TemplateName == templateName {
return &node
}
for _, node := range nodes {
if node.Type != wfv1.NodeTypeRetry {
continue
}
}
if boundaryNode.TemplateRef != nil {
templateRef := boundaryNode.TemplateRef
for _, node := range nodes {
if node.Type == wfv1.NodeTypeRetry && node.TemplateRef != nil && node.TemplateRef.Name == templateRef.Name && node.TemplateRef.Template == templateRef.Template {
return &node
}
if boundaryID == "" && node.HasChild(nodeID) {
return &node
} else if boundaryNode.TemplateName != "" && node.TemplateName == boundaryNode.TemplateName {
return &node
} else if boundaryNode.TemplateRef != nil && node.TemplateRef != nil && node.TemplateRef.Name == boundaryNode.TemplateRef.Name && node.TemplateRef.Template == boundaryNode.TemplateRef.Template {
return &node
}
}

return nil
}

Expand Down

0 comments on commit ca947f3

Please sign in to comment.