Skip to content

Commit

Permalink
fix: Surface the underlying error on wait timeout. (#4966)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Jan 29, 2021
1 parent a00aa32 commit 655c7e2
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 88 deletions.
6 changes: 3 additions & 3 deletions server/event/dispatch/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"

Expand All @@ -22,6 +21,7 @@ import (
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/instanceid"
"github.com/argoproj/argo/v2/util/labels"
waitutil "github.com/argoproj/argo/v2/util/wait"
"github.com/argoproj/argo/v2/workflow/common"
"github.com/argoproj/argo/v2/workflow/creator"
)
Expand Down Expand Up @@ -58,9 +58,9 @@ func (o *Operation) Dispatch(ctx context.Context) {
// we use a predicable suffix for the name so that lost connections cannot result in the same workflow being created twice
// being created twice
nameSuffix := fmt.Sprintf("%v", time.Now().Unix())
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
_, err := o.dispatch(ctx, event, nameSuffix)
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
log.WithError(err).WithFields(log.Fields{"namespace": event.Namespace, "event": event.Name}).Error("failed to dispatch from event")
Expand Down
12 changes: 0 additions & 12 deletions util/errors/done.go

This file was deleted.

27 changes: 0 additions & 27 deletions util/errors/done_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo/v2/errors"
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/retry"
waitutil "github.com/argoproj/argo/v2/util/wait"
)

type Closer interface {
Expand All @@ -36,10 +36,10 @@ func GetSecrets(ctx context.Context, clientSet kubernetes.Interface, namespace,

secretsIf := clientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
var err error
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
Expand Down
26 changes: 26 additions & 0 deletions util/wait/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package wait

import (
"fmt"

"k8s.io/apimachinery/pkg/util/wait"
)

// the underlying ExponentialBackoff does not retain the underlying error
// so this addresses this
func Backoff(b wait.Backoff, f func() (bool, error)) error {
var err error
waitErr := wait.ExponentialBackoff(b, func() (bool, error) {
var done bool
done, err = f()
return done, nil
})
if waitErr != nil {
if err != nil {
return fmt.Errorf("%v: %v", waitErr, err)
} else {
return waitErr
}
}
return err
}
36 changes: 36 additions & 0 deletions util/wait/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package wait

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/wait"
)

func TestExponentialBackoff2(t *testing.T) {
t.Run("NoError", func(t *testing.T) {
err := Backoff(wait.Backoff{Steps: 1}, func() (bool, error) {
return true, nil
})
assert.NoError(t, err)
})
t.Run("Error", func(t *testing.T) {
err := Backoff(wait.Backoff{Steps: 1}, func() (bool, error) {
return true, errors.New("foo")
})
assert.EqualError(t, err, "foo")
})
t.Run("Timeout", func(t *testing.T) {
err := Backoff(wait.Backoff{Steps: 1}, func() (bool, error) {
return false, nil
})
assert.Equal(t, err, wait.ErrWaitTimeout)
})
t.Run("TimeoutError", func(t *testing.T) {
err := Backoff(wait.Backoff{Steps: 1}, func() (bool, error) {
return false, errors.New("foo")
})
assert.EqualError(t, err, "timed out waiting for the condition: foo")
})
}
6 changes: 3 additions & 3 deletions workflow/artifactrepositories/artifactrepositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
v1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/v2/config"
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/retry"
waitutil "github.com/argoproj/argo/v2/util/wait"
)

//go:generate mockery -name Interface
Expand Down Expand Up @@ -76,10 +76,10 @@ func (s *artifactRepositories) get(ctx context.Context, ref *wfv1.ArtifactReposi
var cm *v1.ConfigMap
namespace := ref.Namespace
configMap := ref.GetConfigMapOr("artifact-repositories")
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
var err error
cm, err = s.kubernetesInterface.CoreV1().ConfigMaps(namespace).Get(ctx, configMap, metav1.GetOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/util"
errorsutil "github.com/argoproj/argo/v2/util/errors"
waitutil "github.com/argoproj/argo/v2/util/wait"
)

// FindOverlappingVolume looks an artifact path, checks if it overlaps with any
Expand Down Expand Up @@ -477,9 +478,9 @@ func addPodMetadata(ctx context.Context, c kubernetes.Interface, field, podName,
if err != nil {
return errors.InternalWrapError(err)
}
return wait.ExponentialBackoff(backoff, func() (bool, error) {
return waitutil.Backoff(backoff, func() (bool, error) {
_, err := c.CoreV1().Pods(namespace).Patch(ctx, podName, types.MergePatchType, patch, metav1.PatchOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
}

Expand Down
6 changes: 3 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
Expand All @@ -44,6 +43,7 @@ import (
"github.com/argoproj/argo/v2/util/intstr"
"github.com/argoproj/argo/v2/util/resource"
"github.com/argoproj/argo/v2/util/retry"
waitutil "github.com/argoproj/argo/v2/util/wait"
"github.com/argoproj/argo/v2/workflow/common"
controllercache "github.com/argoproj/argo/v2/workflow/controller/cache"
"github.com/argoproj/argo/v2/workflow/controller/estimation"
Expand Down Expand Up @@ -3098,12 +3098,12 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
if woc.execWf.Spec.PodDisruptionBudget == nil {
return nil
}
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
err := woc.controller.kubeclientset.PolicyV1beta1().PodDisruptionBudgets(woc.wf.Namespace).Delete(ctx, woc.wf.Name, metav1.DeleteOptions{})
if apierr.IsNotFound(err) {
return true, nil
}
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
woc.log.WithField("err", err).Error("Unable to delete PDB resource for workflow.")
Expand Down
6 changes: 3 additions & 3 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/v2/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo/v2/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/v2/util/errors"
waitutil "github.com/argoproj/argo/v2/util/wait"
"github.com/argoproj/argo/v2/workflow/common"
"github.com/argoproj/argo/v2/workflow/metrics"
"github.com/argoproj/argo/v2/workflow/templateresolution"
Expand Down Expand Up @@ -142,10 +142,10 @@ func (woc *cronWfOperationCtx) patch(ctx context.Context, patch map[string]inter
woc.log.WithError(err).Error("failed to marshall cron workflow status.active data")
return
}
err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
err = waitutil.Backoff(retry.DefaultBackoff, func() (bool, error) {
cronWf, err := woc.cronWfIf.Patch(ctx, woc.cronWf.Name, types.MergePatchType, data, v1.PatchOptions{})
if err != nil {
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
}
woc.cronWf = cronWf
return true, nil
Expand Down
25 changes: 11 additions & 14 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/argoproj/argo/v2/util/archive"
errorsutil "github.com/argoproj/argo/v2/util/errors"
"github.com/argoproj/argo/v2/util/retry"
waitutil "github.com/argoproj/argo/v2/util/wait"
artifact "github.com/argoproj/argo/v2/workflow/artifacts"
"github.com/argoproj/argo/v2/workflow/common"
os_specific "github.com/argoproj/argo/v2/workflow/executor/os-specific"
Expand Down Expand Up @@ -610,10 +611,10 @@ func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact)
func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) {
podsIf := we.ClientSet.CoreV1().Pods(we.Namespace)
var pod *apiv1.Pod
err := wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err := waitutil.Backoff(ExecutorRetry, func() (bool, error) {
var err error
pod, err = podsIf.Get(ctx, we.PodName, metav1.GetOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return nil, errors.InternalWrapError(err)
Expand All @@ -630,10 +631,10 @@ func (we *WorkflowExecutor) GetConfigMapKey(ctx context.Context, name, key strin
}
configmapsIf := we.ClientSet.CoreV1().ConfigMaps(namespace)
var configmap *apiv1.ConfigMap
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
var err error
configmap, err = configmapsIf.Get(ctx, name, metav1.GetOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return "", errors.InternalWrapError(err)
Expand All @@ -658,10 +659,10 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key
}
secretsIf := we.ClientSet.CoreV1().Secrets(namespace)
var secret *apiv1.Secret
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
err := waitutil.Backoff(retry.DefaultRetry, func() (bool, error) {
var err error
secret, err = secretsIf.Get(ctx, name, metav1.GetOptions{})
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return []byte{}, errors.InternalWrapError(err)
Expand Down Expand Up @@ -994,13 +995,9 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error {
annotationUpdatesCh := we.monitorAnnotations(ctx)
go we.monitorDeadline(ctx, annotationUpdatesCh)

err = wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err = waitutil.Backoff(ExecutorRetry, func() (bool, error) {
err := we.RuntimeExecutor.Wait(ctx, mainContainerID)
if err != nil {
log.Warnf("Failed to wait for container id '%s': %v", mainContainerID, err)
return false, nil
}
return true, nil
return err == nil, err
})
if err != nil {
return err
Expand All @@ -1020,10 +1017,10 @@ func (we *WorkflowExecutor) waitMainContainerStart(ctx context.Context) (string,

var watchIf watch.Interface

err := wait.ExponentialBackoff(ExecutorRetry, func() (bool, error) {
err := waitutil.Backoff(ExecutorRetry, func() (bool, error) {
var err error
watchIf, err = podsIf.Watch(ctx, opts)
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return "", errors.InternalWrapErrorf(err, "Failed to establish pod watch: %v", err)
Expand Down
5 changes: 3 additions & 2 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/argoproj/argo/v2/errors"
"github.com/argoproj/argo/v2/util/archive"
errorsutil "github.com/argoproj/argo/v2/util/errors"
waitutil "github.com/argoproj/argo/v2/util/wait"
"github.com/argoproj/argo/v2/workflow/common"
execcommon "github.com/argoproj/argo/v2/workflow/executor/common"
argowait "github.com/argoproj/argo/v2/workflow/executor/common/wait"
Expand Down Expand Up @@ -382,10 +383,10 @@ func (p *PNSExecutor) GetTerminatedContainerStatus(ctx context.Context, containe
var containerStatus *corev1.ContainerStatus
// Under high load, the Kubernetes API may be unresponsive for some time (30s). This would have failed the workflow
// previously (<=v2.11) but a 30s back-off mitigates this.
err := wait.ExponentialBackoff(backoffOver30s, func() (bool, error) {
err := waitutil.Backoff(backoffOver30s, func() (bool, error) {
podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(ctx, p.podName, metav1.GetOptions{})
if err != nil {
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
}
for _, containerStatusRes := range podRes.Status.ContainerStatuses {
if execcommon.GetContainerID(&containerStatusRes) != containerID {
Expand Down
9 changes: 5 additions & 4 deletions workflow/hydrator/hydrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/argoproj/argo/v2/persist/sqldb"
wfv1 "github.com/argoproj/argo/v2/pkg/apis/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo/v2/util/errors"
waitutil "github.com/argoproj/argo/v2/util/wait"
"github.com/argoproj/argo/v2/workflow/packer"
)

Expand Down Expand Up @@ -74,9 +75,9 @@ func (h hydrator) Hydrate(wf *wfv1.Workflow) error {
}
if wf.Status.IsOffloadNodeStatus() {
var offloadedNodes wfv1.Nodes
err := wait.ExponentialBackoff(readRetry, func() (bool, error) {
err := waitutil.Backoff(readRetry, func() (bool, error) {
offloadedNodes, err = h.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return err
Expand All @@ -100,9 +101,9 @@ func (h hydrator) Dehydrate(wf *wfv1.Workflow) error {
}
if packer.IsTooLargeError(err) || alwaysOffloadNodeStatus {
var offloadVersion string
err := wait.ExponentialBackoff(writeRetry, func() (bool, error) {
err := waitutil.Backoff(writeRetry, func() (bool, error) {
offloadVersion, err = h.offloadNodeStatusRepo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
return errorsutil.Done(err)
return !errorsutil.IsTransientErr(err), err
})
if err != nil {
return err
Expand Down

0 comments on commit 655c7e2

Please sign in to comment.