Skip to content

Commit

Permalink
feat: speed up resolve reference
Browse files Browse the repository at this point in the history
Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed Dec 7, 2023
1 parent 64ee6ae commit 3265886
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/Knetic/govaluate"
Expand Down Expand Up @@ -414,21 +415,20 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, err
}

for i, step := range stepGroup {
resolveStepReferences := func(i int, step *wfv1.WorkflowStep, newStepGroup []wfv1.WorkflowStep) error {
// Step 1: replace all parameter scope references in the step
// TODO: improve this
stepBytes, err := json.Marshal(step)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}
newStepStr, err := template.Replace(string(stepBytes), woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
return err
}
var newStep wfv1.WorkflowStep
err = json.Unmarshal([]byte(newStepStr), &newStep)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}

// If we are not executing, don't attempt to resolve any artifact references. We only check if we are executing after
Expand All @@ -441,13 +441,13 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if newStep.ShouldExpand() {
proceed = true
} else {
return nil, err
return err
}
}
if !proceed {
// We can simply return this WorkflowStep; the fact that it won't execute will be reconciled later on in execution
newStepGroup[i] = newStep
continue
return nil
}

// Step 2: replace all artifact references
Expand All @@ -461,13 +461,34 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if art.Optional {
continue
}
return nil, fmt.Errorf("unable to resolve references: %s", err)
return fmt.Errorf("unable to resolve references: %s", err)
}
resolvedArt.Name = art.Name
newStep.Arguments.Artifacts[j] = *resolvedArt
}

newStepGroup[i] = newStep
return nil
}

parallelStepNum := make(chan string, 500)
var resolveReferenceError error
var wg sync.WaitGroup
for i, step := range stepGroup {
parallelStepNum <- step.Name
wg.Add(1)
go func(i int, step *wfv1.WorkflowStep, newStepGroup []wfv1.WorkflowStep) {
defer wg.Done()
if err := resolveStepReferences(i, step, newStepGroup); err != nil {
woc.log.WithFields(log.Fields{"stepName": step.Name}).WithError(err).Error(err.Error())
resolveReferenceError = err
}
<-parallelStepNum
}(i, &step, newStepGroup)
}
wg.Wait()
if resolveReferenceError != nil {
return nil, fmt.Errorf("Failed to resolve references: %s", resolveReferenceError)
}
return newStepGroup, nil
}
Expand Down

0 comments on commit 3265886

Please sign in to comment.