Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Retry transient offload errors. Resolves #4464 #4482

Merged
merged 12 commits into from
Nov 23, 2020
10 changes: 7 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
authutil "github.com/argoproj/argo/util/auth"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/workflow/common"
controllercache "github.com/argoproj/argo/workflow/controller/cache"
"github.com/argoproj/argo/workflow/controller/estimation"
Expand Down Expand Up @@ -580,9 +581,12 @@ func (wfc *WorkflowController) processNextItem() bool {

err = wfc.hydrator.Hydrate(woc.wf)
if err != nil {
woc.log.Errorf("hydration failed: %v", err)
woc.markWorkflowError(err)
woc.persistUpdates()
transientErr := errorsutil.IsTransientErr(err)
woc.log.WithField("transientErr", transientErr).Errorf("hydration failed: %v", err)
if !transientErr {
woc.markWorkflowError(err)
woc.persistUpdates()
}
return true
}

Expand Down
25 changes: 22 additions & 3 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package hydrator

import (
"os"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -47,14 +47,33 @@ func (h hydrator) HydrateWithNodes(wf *wfv1.Workflow, offloadedNodes wfv1.Nodes)
wf.Status.OffloadNodeStatusVersion = ""
}

// should be <10s
// Retry Seconds
// 1 0.10
// 2 0.30
// 3 0.70
// 4 1.50
// 5 3.10
var readRetry = wait.Backoff{Steps: 5, Duration: 100 * time.Millisecond, Factor: 2}

// needs to be long
// http://backoffcalculator.com/?attempts=5&rate=2&interval=1
// Retry Seconds
// 1 1.00
// 2 3.00
// 3 7.00
// 4 15.00
// 5 31.00
var writeRetry = wait.Backoff{Steps: 5, Duration: 1 * time.Second, Factor: 2}

func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return err == nil, err
})
Expand All @@ -80,7 +99,7 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus {
var offloadVersion string
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return err == nil, err
})
Expand Down