Skip to content

Commit

Permalink
feat: speed up resolve reference (#12328)
Browse files Browse the repository at this point in the history
Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed Jan 15, 2024
1 parent 01936bb commit 1e7b239
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/Knetic/govaluate"
Expand Down Expand Up @@ -398,6 +399,15 @@ func shouldExecute(when string) (bool, error) {
return boolRes, nil
}

func errorFromChannel(errCh <-chan error) error {
select {
case err := <-errCh:
return err
default:
}
return nil
}

// resolveReferences replaces any references to outputs of previous steps, or artifacts in the inputs
// NOTE: by now, input parameters should have been substituted throughout the template, so we only
// are concerned with:
Expand All @@ -415,21 +425,21 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, err
}

for i, step := range stepGroup {
// Resolve a Step's References and add it to newStepGroup
resolveStepReferences := func(i int, step wfv1.WorkflowStep, newStepGroup []wfv1.WorkflowStep) error {
// Step 1: replace all parameter scope references in the step
// TODO: improve this
stepBytes, err := json.Marshal(step)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}
newStepStr, err := template.Replace(string(stepBytes), woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
return err
}
var newStep wfv1.WorkflowStep
err = json.Unmarshal([]byte(newStepStr), &newStep)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}

// If we are not executing, don't attempt to resolve any artifact references. We only check if we are executing after
Expand All @@ -442,13 +452,13 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if newStep.ShouldExpand() {
proceed = true
} else {
return nil, err
return err
}
}
if !proceed {
// We can simply return this WorkflowStep; the fact that it won't execute will be reconciled later on in execution
newStepGroup[i] = newStep
continue
return nil
}

// Step 2: replace all artifact references
Expand All @@ -462,13 +472,37 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if art.Optional {
continue
}
return nil, fmt.Errorf("unable to resolve references: %s", err)
return fmt.Errorf("unable to resolve references: %s", err)
}
resolvedArt.Name = art.Name
newStep.Arguments.Artifacts[j] = *resolvedArt
}

newStepGroup[i] = newStep
return nil
}

// When resolveStepReferences we can use a channel parallelStepNum to control the number of concurrencies
parallelStepNum := make(chan string, 500)
errCh := make(chan error, len(stepGroup)) // contains the error during resolveStepReferences
var wg sync.WaitGroup
for i, step := range stepGroup {
parallelStepNum <- step.Name
wg.Add(1)
go func(i int, step wfv1.WorkflowStep) {
defer wg.Done()
if err := resolveStepReferences(i, step, newStepGroup); err != nil {
woc.log.WithFields(log.Fields{"stepName": step.Name}).WithError(err).Error("Failed to resolve references")
errCh <- err
}
<-parallelStepNum
}(i, step)
}
wg.Wait()

err = errorFromChannel(errCh) // fetch the first error during resolveStepReferences
if err != nil {
return nil, fmt.Errorf("Failed to resolve references: %s", err)
}
return newStepGroup, nil
}
Expand Down

0 comments on commit 1e7b239

Please sign in to comment.