Skip to content

Commit

Permalink
fix: reconcile wf when taskresult is added/updated. Fixes #10096 (#10097
Browse files Browse the repository at this point in the history
)

Signed-off-by: Michael Weibel <michael@helio.exchange>
  • Loading branch information
mweibel committed Nov 24, 2022
1 parent d03f5e5 commit e5ea21e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
5 changes: 3 additions & 2 deletions test/e2e/progress_test.go
Expand Up @@ -34,7 +34,6 @@ func (s *ProgressSuite) TestDefaultProgress() {
}

func (s *ProgressSuite) TestLoggedProgress() {
s.T().SkipNow()
toHaveProgress := func(p wfv1.Progress) fixtures.Condition {
return func(wf *wfv1.Workflow) (bool, string) {
return wf.Status.Nodes[wf.Name].Progress == p &&
Expand All @@ -46,7 +45,9 @@ func (s *ProgressSuite) TestLoggedProgress() {
Workflow("@testdata/progress-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(toHaveProgress("50/100"), time.Minute). // ARGO_PROGRESS_PATCH_TICK_DURATION=1m
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(toHaveProgress("0/100"), 20*time.Second).
WaitForWorkflow(toHaveProgress("50/100"), 20*time.Second).
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/testdata/progress-workflow.yaml
Expand Up @@ -17,4 +17,4 @@ spec:
image: argoproj/argosay:v2
command: ["/bin/sh", "-c"]
args:
- /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 1m
- /argosay echo 0/100 $ARGO_PROGRESS_FILE && /argosay sleep 10s && /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 10s
17 changes: 16 additions & 1 deletion workflow/controller/taskresult.go
Expand Up @@ -11,6 +11,7 @@ import (

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)

Expand All @@ -21,7 +22,7 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
String()
log.WithField("labelSelector", labelSelector).
Info("Watching task results")
return wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer(
informer := wfextvv1alpha1.NewFilteredWorkflowTaskResultInformer(
wfc.wfclientset,
wfc.GetManagedNamespace(),
20*time.Minute,
Expand All @@ -32,6 +33,20 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndex
options.LabelSelector = labelSelector
},
)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
UpdateFunc: func(old, new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
workflow := result.Labels[common.LabelKeyWorkflow]
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
})
return informer
}

func (woc *wfOperationCtx) taskResultReconciliation() {
Expand Down

0 comments on commit e5ea21e

Please sign in to comment.