Skip to content

Commit

Permalink
fix: Fix variables not substitue bug when creation failed for the fir…
Browse files Browse the repository at this point in the history
…st time. Fixes (#11487)

Signed-off-by: 刘达 <liuda1@kingsoft.com>
Co-authored-by: 刘达 <liuda1@kingsoft.com>
Co-authored-by: Julie Vogelman <julievogelman0@gmail.com>
  • Loading branch information
3 people authored and sarabala1979 committed Jan 8, 2024
1 parent 29e613e commit b998c50
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 16 deletions.
36 changes: 20 additions & 16 deletions workflow/controller/operator.go
Expand Up @@ -2075,32 +2075,36 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
localScope, realTimeScope := woc.prepareMetricScope(lastChildNode)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}

var retryNum int
if lastChildNode != nil && !lastChildNode.Fulfilled() {
// Last child node is still running.
// Last child node is either still running, or in some cases the corresponding Pod hasn't even been
// created yet, for example if it exceeded the ResourceQuota
nodeName = lastChildNode.Name
node = lastChildNode
retryNum = len(childNodeIDs) - 1
} else {
retryNum := len(childNodeIDs)
// Create a new child node and append it to the retry node.
retryNum = len(childNodeIDs)
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, retryNum)
woc.addChildNode(retryNodeName, nodeName)
node = nil
}

localParams := make(map[string]string)
// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
}
// Inject the retryAttempt number
localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)
localParams = make(map[string]string)
// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
}
// Inject the retryAttempt number
localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)

processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
if errorsutil.IsTransientErr(err) {
return node, err
}
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err
}
processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
if errorsutil.IsTransientErr(err) {
return node, err
}
if err != nil {
return woc.initializeNodeOrMarkError(node, nodeName, templateScope, orgTmpl, opts.boundaryID, opts.nodeFlag, err), err
}
}

Expand Down
128 changes: 128 additions & 0 deletions workflow/controller/operator_test.go
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
batchfake "k8s.io/client-go/kubernetes/typed/batch/v1/fake"
corefake "k8s.io/client-go/kubernetes/typed/core/v1/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -9745,3 +9746,130 @@ func TestGetChildNodeIdsAndLastRetriedNode(t *testing.T) {
assert.Equal(t, childNodes[1].ID, lastChildNode.ID)
})
}

func TestRetryWhenEncounterExceededQuota(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
kind: Workflow
apiVersion: argoproj.io/v1alpha1
metadata:
name: exceeded-quota
creationTimestamp:
labels:
workflows.argoproj.io/phase: Running
annotations:
workflows.argoproj.io/pod-name-format: v2
spec:
templates:
- name: entrypoint
inputs: {}
outputs: {}
metadata: {}
container:
name: 'main'
image: centos:7
command:
- python
- "-c"
- echo
args:
- "{{retries}}"
- "{{pod.name}}"
resources: {}
retryStrategy:
limit: 10
retryPolicy: Always
backoff:
duration: 5s
entrypoint: entrypoint
arguments: {}
status:
phase: Runningg
startedAt: '2023-09-05T12:02:20Z'
finishedAt:
estimatedDuration: 1
progress: 0/1
nodes:
exceeded-quota:
id: exceeded-quota
name: exceeded-quota
displayName: exceeded-quota
type: Retry
templateName: main
templateScope: local/exceeded-quota
phase: Running
startedAt: '2023-09-05T12:02:20Z'
finishedAt:
estimatedDuration: 1
progress: 0/1
children:
- exceeded-quota-3674300323
- exceeded-quota-hook-8574637190
- exceeded-quota-8574637190
exceeded-quota-3674300323:
id: exceeded-quota-3674300323
name: exceeded-quota(0)
displayName: exceeded-quota(0)
type: Pod
nodeFlag:
retried: true
templateName: main
templateScope: local/exceeded-quota
phase: Failed
message: 'test1.test "test" is forbidden: exceeded quota'
startedAt: '2023-09-05T12:02:20Z'
finishedAt:
estimatedDuration: 1
progress: 0/1
exceeded-quota-hook-8574637190:
id: exceeded-quota-hook-8574637190
name: exceeded-quota-hook
displayName: exceeded-quota-hook
type: Pod
nodeFlag:
hooked: true
exceeded-quota-8574637190:
id: exceeded-quota-8574637190
name: exceeded-quota(1)
displayName: exceeded-quota(1)
type: Pod
nodeFlag:
retried: true
templateName: main
templateScope: local/exceeded-quota
phase: Pending
message: 'test1.test "test" is forbidden: exceeded quota'
startedAt: '2023-09-05T12:02:20Z'
finishedAt:
estimatedDuration: 1
progress: 0/1
artifactRepositoryRef: {}
artifactGCStatus:
notSpecified: true
`)

cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()

controller.kubeclientset.(*fake.Clientset).CoreV1().(*corefake.FakeCoreV1).Fake.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
createAction, ok := action.(k8stesting.CreateAction)
assert.True(t, ok)

pod, ok := createAction.GetObject().(*apiv1.Pod)
assert.True(t, ok)

for _, container := range pod.Spec.Containers {
if container.Name == "main" {
t.Log("Container args: ", container.Args[0], container.Args[1])
assert.Equal(t, "1", container.Args[0])
}
}

return true, pod, nil
})

woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
}

0 comments on commit b998c50

Please sign in to comment.