Skip to content

Commit

Permalink
fix: cache configmap don't create with workflow has retrystrategy. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shuangkun committed Jan 19, 2024
1 parent 1ab7cd2 commit 46c1324
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 6 deletions.
3 changes: 1 addition & 2 deletions workflow/controller/exit_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (

func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, exitHook *wfv1.LifecycleHook, parentNode *wfv1.NodeStatus, boundaryID string, tmplCtx *templateresolution.Context, prefix string, scope *wfScope) (bool, *wfv1.NodeStatus, error) {
outputs := parentNode.Outputs
if parentNode.Type == wfv1.NodeTypeRetry {
lastChildNode := getChildNodeIndex(parentNode, woc.wf.Status.Nodes, -1)
if lastChildNode := woc.possiblyGetRetryChildNode(parentNode); lastChildNode != nil {
outputs = lastChildNode.Outputs
}

Expand Down
3 changes: 1 addition & 2 deletions workflow/controller/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func (woc *wfOperationCtx) executeTmplLifeCycleHook(ctx context.Context, scope *
// executeTemplated should be invoked when hookedNode != nil, because we should reexecute the function to check mutex condition, etc.
if execute || hookedNode != nil {
outputs := parentNode.Outputs
if parentNode.Type == wfv1.NodeTypeRetry {
lastChildNode := getChildNodeIndex(parentNode, woc.wf.Status.Nodes, -1)
if lastChildNode := woc.possiblyGetRetryChildNode(parentNode); lastChildNode != nil {
outputs = lastChildNode.Outputs
}
woc.log.WithField("lifeCycleHook", hookName).WithField("node", hookNodeName).WithField("hookName", hookName).Info("Running hooks")
Expand Down
22 changes: 20 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,6 +1736,14 @@ func (woc *wfOperationCtx) deletePVCs(ctx context.Context) error {
return firstErr
}

// Check if we have a retry node which wasn't memoized and return that if we do
func (woc *wfOperationCtx) possiblyGetRetryChildNode(node *wfv1.NodeStatus) *wfv1.NodeStatus {
if node.Type == wfv1.NodeTypeRetry && !(node.MemoizationStatus != nil && node.MemoizationStatus.Hit) {
return getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
}
return nil
}

func getChildNodeIndex(node *wfv1.NodeStatus, nodes wfv1.Nodes, index int) *wfv1.NodeStatus {
if len(node.Children) <= 0 {
return nil
Expand Down Expand Up @@ -2409,6 +2417,16 @@ func (woc *wfOperationCtx) initializeExecutableNode(nodeName string, nodeType wf
node.Inputs = executeTmpl.Inputs.DeepCopy()
}

// Set the MemoizationStatus
if node.MemoizationStatus == nil && executeTmpl.Memoize != nil {
memoizationStatus := &wfv1.MemoizationStatus{
Hit: false,
Key: executeTmpl.Memoize.Key,
CacheName: executeTmpl.Memoize.Cache.ConfigMap.Name,
}
node.MemoizationStatus = memoizationStatus
}

if nodeType == wfv1.NodeTypeSuspend {
node = addRawOutputFields(node, executeTmpl)
}
Expand Down Expand Up @@ -2978,8 +2996,8 @@ func (woc *wfOperationCtx) requeueIfTransientErr(err error, nodeName string) (*w
func (woc *wfOperationCtx) buildLocalScope(scope *wfScope, prefix string, node *wfv1.NodeStatus) {
// It may be that the node is a retry node, in which case we want to get the outputs of the last node
// in the retry group instead of the retry node itself.
if node.Type == wfv1.NodeTypeRetry {
node = getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
if lastChildNode := woc.possiblyGetRetryChildNode(node); lastChildNode != nil {
node = lastChildNode
}

if node.ID != "" {
Expand Down
209 changes: 209 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5339,6 +5339,215 @@ func TestConfigMapCacheLoadOperateMaxAge(t *testing.T) {
}
}

var workflowStepCachedWithRetryStrategy = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: memoized-workflow-test
spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
value: hi-there-world
templates:
- name: whalesay
inputs:
parameters:
- name: message
retryStrategy:
limit: "10"
memoize:
key: "{{inputs.parameters.message}}"
cache:
configMap:
name: whalesay-cache
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
outputs:
parameters:
- name: hello
valueFrom:
path: /tmp/hello_world.txt
`

var workflowDagCachedWithRetryStrategy = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: memoized-workflow-test
spec:
entrypoint: main
# podGC:
# strategy: OnPodCompletion
templates:
- name: main
dag:
tasks:
- name: regular-1
template: run
arguments:
parameters:
- name: id
value: 1
- name: cache-key
value: '{{workflow.name}}'
- name: regular-2
template: run
depends: regular-1.Succeeded
arguments:
parameters:
- name: id
value: 2
- name: cache-key
value: '{{workflow.name}}'
- name: with-retries-1
template: run-with-retries
arguments:
parameters:
- name: id
value: 3
- name: cache-key
value: '{{workflow.name}}'
- name: with-retries-2
template: run-with-retries
depends: with-retries-1.Succeeded
arguments:
parameters:
- name: id
value: 4
- name: cache-key
value: '{{workflow.name}}'
- name: with-dag-1
template: run-with-dag
arguments:
parameters:
- name: id
value: 5
- name: cache-key
value: '{{workflow.name}}'
- name: with-dag-2
template: run-with-dag
depends: with-dag-1.Succeeded
arguments:
parameters:
- name: id
value: 6
- name: cache-key
value: '{{workflow.name}}'
- name: run
inputs:
parameters:
- name: id
- name: cache-key
script:
image: ubuntu:22.04
command: [bash]
source: |
sleep 30
echo result: {{inputs.parameters.id}}
memoize:
key: "regular-{{inputs.parameters.cache-key}}"
cache:
configMap:
name: memoization-test-cache
- name: run-with-retries
inputs:
parameters:
- name: id
- name: cache-key
script:
image: ubuntu:22.04
command: [bash]
source: |
sleep 30
echo result: {{inputs.parameters.id}}
memoize:
key: "retry-{{inputs.parameters.cache-key}}"
cache:
configMap:
name: memoization-test-cache
retryStrategy:
limit: '1'
retryPolicy: Always
- name: run-raw
inputs:
parameters:
- name: id
- name: cache-key
script:
image: ubuntu:22.04
command: [bash]
source: |
sleep 30
echo result: {{inputs.parameters.id}}
- name: run-with-dag
inputs:
parameters:
- name: id
- name: cache-key
dag:
tasks:
- name: run-raw-step
template: run-raw
arguments:
parameters:
- name: id
value: '{{inputs.parameters.id}}'
- name: cache-key
value: '{{inputs.parameters.cache-key}}'
memoize:
key: "dag-{{inputs.parameters.cache-key}}"
cache:
configMap:
name: memoization-test-cache`

func TestStepConfigMapCacheCreateWhenHaveRetryStrategy(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowStepCachedWithRetryStrategy)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, "whalesay-cache", metav1.GetOptions{})
assert.NoError(t, err)
assert.Contains(t, cm.Labels, common.LabelKeyConfigMapType)
assert.Equal(t, common.LabelValueTypeConfigMapCache, cm.Labels[common.LabelKeyConfigMapType])
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestDAGConfigMapCacheCreateWhenHaveRetryStrategy(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(workflowDagCachedWithRetryStrategy)
cancel, controller := newController()
defer cancel()

ctx := context.Background()
_, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(wf.ObjectMeta.Namespace).Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)

woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
makePodsPhase(ctx, woc, apiv1.PodSucceeded)
woc.operate(ctx)
cm, err := controller.kubeclientset.CoreV1().ConfigMaps("default").Get(ctx, "memoization-test-cache", metav1.GetOptions{})
assert.NoError(t, err)
assert.Contains(t, cm.Labels, common.LabelKeyConfigMapType)
assert.Equal(t, common.LabelValueTypeConfigMapCache, cm.Labels[common.LabelKeyConfigMapType])
assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase)
}

func TestConfigMapCacheLoadNoLabels(t *testing.T) {
sampleConfigMapCacheEntry := apiv1.ConfigMap{
Data: map[string]string{
Expand Down

0 comments on commit 46c1324

Please sign in to comment.