Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support specifying the pattern for transient and retryable errors #4889

Merged
merged 6 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Config struct {
// MainContainer holds container customization for the main container
MainContainer *apiv1.Container `json:"mainContainer,omitempty"`

// TransientErrorPattern specifies the pattern to match for errors that can be seen as transient
// and retryable.
TransientErrorPattern string `json:"transientErrorPattern,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a rarely used feature, so an environment variable would be the way to configure it - it is much simpler too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case, we'd like to whitelist non-critical errors that can be retried for issues during cluster maintenance/migration, etc., especially for non-production clusters where certain services may not be available.

Good suggestion. I'll add an env var to configure this instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated. Please take another look.


// KubeConfig specifies a kube config file for the wait & init containers
KubeConfig *KubeConfig `json:"kubeConfig,omitempty"`

Expand Down
13 changes: 11 additions & 2 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,28 @@ package errors
import (
"net"
"net/url"
"regexp"
"strings"

apierr "k8s.io/apimachinery/pkg/api/errors"

argoerrs "github.com/argoproj/argo/errors"
)

func IsTransientErr(err error) bool {
func IsTransientErr(err error, pattern string) bool {
if err == nil {
return false
}
err = argoerrs.Cause(err)
return isExceededQuotaErr(err) || apierr.IsTooManyRequests(err) || isResourceQuotaConflictErr(err) || isTransientNetworkErr(err) || apierr.IsServerTimeout(err) || apierr.IsServiceUnavailable(err)
return isExceededQuotaErr(err) || apierr.IsTooManyRequests(err) || isResourceQuotaConflictErr(err) || isTransientNetworkErr(err) || apierr.IsServerTimeout(err) || apierr.IsServiceUnavailable(err) || matchTransientErrPattern(err, pattern)
}

func matchTransientErrPattern(err error, pattern string) bool {
if pattern == "" {
return false
}
match, _ := regexp.MatchString(pattern, err.Error())
return match
}

func isExceededQuotaErr(err error) bool {
Expand Down
34 changes: 20 additions & 14 deletions util/errors/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,48 @@ func (n netError) Temporary() bool { return false }
var tlsHandshakeTimeoutErr net.Error = netError("net/http: TLS handshake timeout")
var ioTimeoutErr net.Error = netError("i/o timeout")
var connectionTimedout net.Error = netError("connection timed out")
var transientErr net.Error = netError("this error is transient")

func TestIsTransientErr(t *testing.T) {
t.Run("Nil", func(t *testing.T) {
assert.False(t, IsTransientErr(nil))
assert.False(t, IsTransientErr(nil, ""))
})
t.Run("ResourceQuotaConflictErr", func(t *testing.T) {
assert.False(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{}, "", nil)))
assert.True(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{Group: "v1", Resource: "resourcequotas"}, "", nil)))
assert.False(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{}, "", nil), ""))
assert.True(t, IsTransientErr(apierr.NewConflict(schema.GroupResource{Group: "v1", Resource: "resourcequotas"}, "", nil), ""))
})
t.Run("ExceededQuotaErr", func(t *testing.T) {
assert.False(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{}, "", nil)))
assert.True(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{Group: "v1", Resource: "pods"}, "", errors.New("exceeded quota"))))
assert.False(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{}, "", nil), ""))
assert.True(t, IsTransientErr(apierr.NewForbidden(schema.GroupResource{Group: "v1", Resource: "pods"}, "", errors.New("exceeded quota")), ""))
})
t.Run("TooManyRequestsDNS", func(t *testing.T) {
assert.True(t, IsTransientErr(apierr.NewTooManyRequests("", 0)))
assert.True(t, IsTransientErr(apierr.NewTooManyRequests("", 0), ""))
})
t.Run("DNSError", func(t *testing.T) {
assert.True(t, IsTransientErr(&net.DNSError{}))
assert.True(t, IsTransientErr(&net.DNSError{}, ""))
})
t.Run("OpError", func(t *testing.T) {
assert.True(t, IsTransientErr(&net.OpError{}))
assert.True(t, IsTransientErr(&net.OpError{}, ""))
})
t.Run("UnknownNetworkError", func(t *testing.T) {
assert.True(t, IsTransientErr(net.UnknownNetworkError("")))
assert.True(t, IsTransientErr(net.UnknownNetworkError(""), ""))
})
t.Run("ConnectionClosedErr", func(t *testing.T) {
assert.False(t, IsTransientErr(&url.Error{Err: errors.New("")}))
assert.True(t, IsTransientErr(&url.Error{Err: errors.New("Connection closed by foreign host")}))
assert.False(t, IsTransientErr(&url.Error{Err: errors.New("")}, ""))
assert.True(t, IsTransientErr(&url.Error{Err: errors.New("Connection closed by foreign host")}, ""))
})
t.Run("TLSHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(tlsHandshakeTimeoutErr))
assert.True(t, IsTransientErr(tlsHandshakeTimeoutErr, ""))
})
t.Run("IOHandshakeTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(ioTimeoutErr))
assert.True(t, IsTransientErr(ioTimeoutErr, ""))
})
t.Run("ConnectionTimeout", func(t *testing.T) {
assert.True(t, IsTransientErr(connectionTimedout))
assert.True(t, IsTransientErr(connectionTimedout, ""))
})
t.Run("TransientErrorPattern", func(t *testing.T) {
assert.True(t, IsTransientErr(transientErr, "this error is transient"))
assert.False(t, IsTransientErr(transientErr, "this error is not transient"))
assert.False(t, IsTransientErr(transientErr, ""))
})
}
2 changes: 1 addition & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func GetSecrets(ctx context.Context, clientSet kubernetes.Interface, namespace,
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
if err != nil {
log.Warnf("Failed to get secret '%s': %v", name, err)
if !errorsutil.IsTransientErr(err) {
if !errorsutil.IsTransientErr(err, "") {
return false, err
}
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion workflow/artifactrepositories/artifactrepositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *artifactRepositories) get(ctx context.Context, ref *wfv1.ArtifactReposi
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
var err error
cm, err = s.kubernetesInterface.CoreV1().ConfigMaps(namespace).Get(ctx, configMap, metav1.GetOptions{})
return err == nil || !errorsutil.IsTransientErr(err), err
return err == nil || !errorsutil.IsTransientErr(err, ""), err
})
if err != nil {
return nil, nil, err
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (wfc *WorkflowController) processNextPodCleanupItem(ctx context.Context) bo
}()
if err != nil {
logCtx.WithError(err).Warn("failed to clean-up pod")
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err, wfc.Config.TransientErrorPattern) {
wfc.podCleanupQueue.AddRateLimited(key)
}
}
Expand Down Expand Up @@ -606,7 +606,7 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {

err = wfc.hydrator.Hydrate(woc.wf)
if err != nil {
transientErr := errorsutil.IsTransientErr(err)
transientErr := errorsutil.IsTransientErr(err, wfc.Config.TransientErrorPattern)
woc.log.WithField("transientErr", transientErr).Errorf("hydration failed: %v", err)
if !transientErr {
woc.markWorkflowError(ctx, err)
Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {

err = woc.createPVCs(ctx)
if err != nil {
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err, woc.controller.Config.TransientErrorPattern) {
// Error was most likely caused by a lack of resources.
// In this case, Workflow will be in pending state and requeue.
woc.markWorkflowPhase(ctx, wfv1.NodePending, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
Expand Down Expand Up @@ -2374,7 +2374,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t
}

func (woc *wfOperationCtx) requeueIfTransientErr(err error, nodeName string) (*wfv1.NodeStatus, error) {
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err, woc.controller.Config.TransientErrorPattern) {
// Our error was most likely caused by a lack of resources.
woc.requeue()
return woc.markNodePending(nodeName, err), nil
Expand Down Expand Up @@ -3085,7 +3085,7 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
err := woc.controller.kubeclientset.PolicyV1beta1().PodDisruptionBudgets(woc.wf.Namespace).Delete(ctx, woc.wf.Name, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
woc.log.WithField("err", err).Warn("Failed to delete PDB.")
if !errorsutil.IsTransientErr(err) {
if !errorsutil.IsTransientErr(err, woc.controller.Config.TransientErrorPattern) {
return false, err
}
return false, nil
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
woc.log.Infof("Failed pod %s (%s) creation: already exists", nodeName, nodeID)
return created, nil
}
if errorsutil.IsTransientErr(err) {
if errorsutil.IsTransientErr(err, woc.controller.Config.TransientErrorPattern) {
return nil, err
}
woc.log.Infof("Failed to create pod %s (%s): %v", nodeName, nodeID, err)
Expand Down
2 changes: 1 addition & 1 deletion workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (woc *cronWfOperationCtx) patch(ctx context.Context, patch map[string]inter
err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
cronWf, err := woc.cronWfIf.Patch(ctx, woc.cronWf.Name, types.MergePatchType, data, v1.PatchOptions{})
if err != nil {
if argoerr.IsTransientErr(err) {
if argoerr.IsTransientErr(err, "") {
return false, nil
}
return false, err
Expand Down
2 changes: 1 addition & 1 deletion workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (p *PNSExecutor) GetTerminatedContainerStatus(ctx context.Context, containe
err := wait.ExponentialBackoff(backoffOver30s, func() (bool, error) {
podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.podName, metav1.GetOptions{})
if err != nil {
return !errorsutil.IsTransientErr(err), fmt.Errorf("could not get pod: %w", err)
return !errorsutil.IsTransientErr(err, ""), fmt.Errorf("could not get pod: %w", err)
}
for _, containerStatusRes := range podRes.Status.ContainerStatuses {
if execcommon.GetContainerID(&containerStatusRes) != containerID {
Expand Down