Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: speed up resolve reference #12328

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/Knetic/govaluate"
Expand Down Expand Up @@ -397,6 +398,15 @@ func shouldExecute(when string) (bool, error) {
return boolRes, nil
}

func errorFromChannel(errCh <-chan error) error {
select {
case err := <-errCh:
return err
default:
}
return nil
}

// resolveReferences replaces any references to outputs of previous steps, or artifacts in the inputs
// NOTE: by now, input parameters should have been substituted throughout the template, so we only
// are concerned with:
Expand All @@ -414,21 +424,21 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
return nil, err
}

for i, step := range stepGroup {
// Resolve a Step's References and add it to newStepGroup
resolveStepReferences := func(i int, step wfv1.WorkflowStep, newStepGroup []wfv1.WorkflowStep) error {
// Step 1: replace all parameter scope references in the step
// TODO: improve this
stepBytes, err := json.Marshal(step)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}
newStepStr, err := template.Replace(string(stepBytes), woc.globalParams.Merge(scope.getParameters()), true)
if err != nil {
return nil, err
return err
}
var newStep wfv1.WorkflowStep
err = json.Unmarshal([]byte(newStepStr), &newStep)
if err != nil {
return nil, errors.InternalWrapError(err)
return errors.InternalWrapError(err)
}

// If we are not executing, don't attempt to resolve any artifact references. We only check if we are executing after
Expand All @@ -441,13 +451,13 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if newStep.ShouldExpand() {
proceed = true
} else {
return nil, err
return err
}
}
if !proceed {
// We can simply return this WorkflowStep; the fact that it won't execute will be reconciled later on in execution
newStepGroup[i] = newStep
continue
return nil
}

// Step 2: replace all artifact references
Expand All @@ -461,13 +471,37 @@ func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scop
if art.Optional {
continue
}
return nil, fmt.Errorf("unable to resolve references: %s", err)
return fmt.Errorf("unable to resolve references: %s", err)
}
resolvedArt.Name = art.Name
newStep.Arguments.Artifacts[j] = *resolvedArt
}

newStepGroup[i] = newStep
return nil
}

// When resolveStepReferences we can use a channel parallelStepNum to control the number of concurrencies
parallelStepNum := make(chan string, 500)
errCh := make(chan error, len(stepGroup)) // contains the error during resolveStepReferences
var wg sync.WaitGroup
for i, step := range stepGroup {
parallelStepNum <- step.Name
wg.Add(1)
go func(i int, step wfv1.WorkflowStep) {
defer wg.Done()
if err := resolveStepReferences(i, step, newStepGroup); err != nil {
woc.log.WithFields(log.Fields{"stepName": step.Name}).WithError(err).Error("Failed to resolve references")
errCh <- err
}
<-parallelStepNum
juliev0 marked this conversation as resolved.
Show resolved Hide resolved
}(i, step)
}
wg.Wait()

err = errorFromChannel(errCh) // fetch the first error during resolveStepReferences
if err != nil {
return nil, fmt.Errorf("Failed to resolve references: %s", err)
}
return newStepGroup, nil
}
Expand Down
Loading