Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Retry transient offload errors. Resolves #4464 #4482

Merged
merged 12 commits into from
Nov 23, 2020
96 changes: 96 additions & 0 deletions persist/sqldb/retry/offload_node_status_repo_with_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package retry

import (
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/util/errors"
)

// should be <10s
// Retry Seconds
// 1 0.10
// 2 0.30
// 3 0.70
// 4 1.50
// 5 3.10
var readRetry = wait.Backoff{Steps: 5, Duration: 100 * time.Millisecond, Factor: 2}

// needs to be long
// http://backoffcalculator.com/?attempts=5&rate=2&interval=1
// Retry Seconds
// 1 1.00
// 2 3.00
// 3 7.00
// 4 15.00
// 5 31.00
var writeRetry = wait.Backoff{Steps: 5, Duration: 1 * time.Second, Factor: 2}

type offloadNodeStatusRepoWithRetry struct {
delegate sqldb.OffloadNodeStatusRepo
}

func WithRetry(delegate sqldb.OffloadNodeStatusRepo) sqldb.OffloadNodeStatusRepo {
return &offloadNodeStatusRepoWithRetry{delegate}
}

func (o *offloadNodeStatusRepoWithRetry) Save(uid, namespace string, nodes wfv1.Nodes) (string, error) {
var version string
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
var err error
version, err = o.delegate.Save(uid, namespace, nodes)
return done(err), err
})
return version, err
}

func (o *offloadNodeStatusRepoWithRetry) Get(uid, version string) (wfv1.Nodes, error) {
var nodes wfv1.Nodes
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
var err error
nodes, err = o.delegate.Get(uid, version)
return done(err), err
})
return nodes, err
}

func done(err error) bool {
return err == nil || !errors.IsTransientErr(err)
}

func (o *offloadNodeStatusRepoWithRetry) List(namespace string) (map[sqldb.UUIDVersion]wfv1.Nodes, error) {
var nodes map[sqldb.UUIDVersion]wfv1.Nodes
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
var err error
nodes, err = o.delegate.List(namespace)
return done(err), err
})
return nodes, err
}

func (o *offloadNodeStatusRepoWithRetry) ListOldOffloads(namespace string) ([]sqldb.UUIDVersion, error) {
var versions []sqldb.UUIDVersion
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
var err error
versions, err = o.delegate.ListOldOffloads(namespace)
return done(err), err
})
return versions, err
}

func (o *offloadNodeStatusRepoWithRetry) Delete(uid, version string) error {
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
err := o.delegate.Delete(uid, version)
return done(err), err
})
return err
}

func (o *offloadNodeStatusRepoWithRetry) IsEnabled() bool {
return o.delegate.IsEnabled()
}

var _ sqldb.OffloadNodeStatusRepo = &offloadNodeStatusRepoWithRetry{}
85 changes: 85 additions & 0 deletions persist/sqldb/retry/offload_node_status_repo_with_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package retry

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apierr "k8s.io/apimachinery/pkg/api/errors"

"github.com/argoproj/argo/persist/sqldb"
sqldbmocks "github.com/argoproj/argo/persist/sqldb/mocks"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

var transientErr = apierr.NewTooManyRequests("", 0)
var permanentErr = errors.New("")

func Test_offloadNodeStatusRepoWithRetry(t *testing.T) {
t.Run("PermanentError", func(t *testing.T) {
delegate := &sqldbmocks.OffloadNodeStatusRepo{}
o := WithRetry(delegate)
delegate.On("Save", mock.Anything, mock.Anything, mock.Anything).
Return("", transientErr).
Return("", permanentErr)
_, err := o.Save("my-uid", "my-ns", wfv1.Nodes{})
assert.Equal(t, permanentErr, err)
})
delegate := &sqldbmocks.OffloadNodeStatusRepo{}
o := WithRetry(delegate)
t.Run("Save", func(t *testing.T) {
delegate.On("Save", "my-uid", "my-ns", mock.Anything).
Return("", transientErr).
Return("my-version", nil)
version, err := o.Save("my-uid", "my-ns", wfv1.Nodes{})
if assert.NoError(t, err) {
assert.Equal(t, "my-version", version)
}
})
t.Run("Get", func(t *testing.T) {
delegate.On("Get", "my-uid", "my-version").
Return(nil, transientErr).
Return(wfv1.Nodes{}, nil)
nodes, err := o.Get("my-uid", "my-version")
if assert.NoError(t, err) {
assert.NotNil(t, nodes)
}
})
t.Run("List", func(t *testing.T) {
delegate.On("List", "my-ns").
Return(nil, transientErr).
Return(make(map[sqldb.UUIDVersion]wfv1.Nodes), nil)
list, err := o.List("my-ns")
if assert.NoError(t, err) {
assert.NotNil(t, list)
}
})
t.Run("ListOldOffloads", func(t *testing.T) {
delegate.On("ListOldOffloads", "my-ns").
Return(nil, transientErr).
Return(make([]sqldb.UUIDVersion, 0), nil)
list, err := o.ListOldOffloads("my-ns")
if assert.NoError(t, err) {
assert.NotNil(t, list)
}
})
t.Run("Delete", func(t *testing.T) {
delegate.On("Delete", "my-uid", "my-version").
Return(transientErr).
Return(nil)
err := o.Delete("my-uid", "my-version")
assert.NoError(t, err)
})
t.Run("IsEnabled", func(t *testing.T) {
delegate.On("IsEnabled").
Return(true)
assert.True(t, o.IsEnabled())
})
}

func Test_done(t *testing.T) {
assert.True(t, done(nil))
assert.False(t, done(transientErr))
assert.True(t, done(permanentErr))
}
2 changes: 2 additions & 0 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj/argo"
"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
clusterwftemplatepkg "github.com/argoproj/argo/pkg/apiclient/clusterworkflowtemplate"
cronworkflowpkg "github.com/argoproj/argo/pkg/apiclient/cronworkflow"
eventpkg "github.com/argoproj/argo/pkg/apiclient/event"
Expand Down Expand Up @@ -157,6 +158,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
offloadRepo = retry.WithRetry(offloadRepo)
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/fixtures/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
"github.com/argoproj/argo/util/instanceid"
)

Expand All @@ -32,6 +33,7 @@ func newPersistence(kubeClient kubernetes.Interface, wcConfig *config.Config) *P
if err != nil {
panic(err)
}
offloadNodeStatusRepo = retry.WithRetry(offloadNodeStatusRepo)
instanceIDService := instanceid.NewService(wcConfig.InstanceID)
workflowArchive := sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), Namespace, instanceIDService)
return &Persistence{session, offloadNodeStatusRepo, workflowArchive}
Expand Down
4 changes: 3 additions & 1 deletion workflow/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/argoproj/argo/config"
"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/retry"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/workflow/hydrator"
)
Expand Down Expand Up @@ -51,10 +52,11 @@ func (wfc *WorkflowController) updateConfig(v interface{}) error {

wfc.session = session
if persistence.NodeStatusOffload {
wfc.offloadNodeStatusRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
offloadNodeStatusRepo, err := sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
return err
}
wfc.offloadNodeStatusRepo = retry.WithRetry(offloadNodeStatusRepo)
log.Info("Node status offloading is enabled")
} else {
log.Info("Node status offloading is disabled")
Expand Down
6 changes: 5 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,11 @@ func (woc *wfOperationCtx) markWorkflowFailed(message string) {
}

func (woc *wfOperationCtx) markWorkflowError(err error) {
woc.markWorkflowPhase(wfv1.NodeError, err.Error())
if errorsutil.IsTransientErr(err) {
woc.markWorkflowPhase(woc.wf.Status.Phase, fmt.Sprintf("workflow had transient error: %v", err))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a significant change - I think this makes this v3 feature

} else {
woc.markWorkflowPhase(wfv1.NodeError, err.Error())
}
}

// stepsOrDagSeparator identifies if a node name starts with our naming convention separator from
Expand Down
14 changes: 2 additions & 12 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"os"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -53,11 +51,7 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
return err
}
if wf.Status.IsOffloadNodeStatus() {
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return err == nil, err
})
offloadedNodes, err := h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return err
}
Expand All @@ -79,11 +73,7 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
}
if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus {
var offloadVersion string
err := wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return err == nil, err
})
offloadVersion, err := h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
if err != nil {
return err
}
Expand Down