Skip to content

Commit

Permalink
Add robustness to wait status checks
Browse files Browse the repository at this point in the history
When a user specifies --wait for an install or upgrade this commit will enable the user to specifiy a number of retries to attepmt if a status check fails
Errors including a HTTP status code < 500 will not be retried

Signed-off-by: MichaelMorris <michael.morris@est.tech>
  • Loading branch information
MichaelMorrisEst committed Jul 13, 2023
1 parent 99e1dce commit fc74964
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 22 deletions.
1 change: 1 addition & 0 deletions cmd/helm/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func addInstallFlags(cmd *cobra.Command, f *pflag.FlagSet, client *action.Instal
f.DurationVar(&client.Timeout, "timeout", 300*time.Second, "time to wait for any individual Kubernetes operation (like Jobs for hooks)")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.IntVar(&client.WaitRetries, "wait-retries", 0, "if set and --wait enabled, will retry any failed check on resource state subject to the specified number of retries")
f.BoolVarP(&client.GenerateName, "generate-name", "g", false, "generate the name (and omit the NAME parameter)")
f.StringVar(&client.NameTemplate, "name-template", "", "specify template used to name the release")
f.StringVar(&client.Description, "description", "", "add a custom description")
Expand Down
2 changes: 2 additions & 0 deletions cmd/helm/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
instClient.Timeout = client.Timeout
instClient.Wait = client.Wait
instClient.WaitForJobs = client.WaitForJobs
instClient.WaitRetries = client.WaitRetries
instClient.Devel = client.Devel
instClient.Namespace = client.Namespace
instClient.Atomic = client.Atomic
Expand Down Expand Up @@ -233,6 +234,7 @@ func newUpgradeCmd(cfg *action.Configuration, out io.Writer) *cobra.Command {
f.BoolVar(&client.ReuseValues, "reuse-values", false, "when upgrading, reuse the last release's values and merge in any overrides from the command line via --set and -f. If '--reset-values' is specified, this is ignored")
f.BoolVar(&client.Wait, "wait", false, "if set, will wait until all Pods, PVCs, Services, and minimum number of Pods of a Deployment, StatefulSet, or ReplicaSet are in a ready state before marking the release as successful. It will wait for as long as --timeout")
f.BoolVar(&client.WaitForJobs, "wait-for-jobs", false, "if set and --wait enabled, will wait until all Jobs have been completed before marking the release as successful. It will wait for as long as --timeout")
f.IntVar(&client.WaitRetries, "wait-retries", 0, "if set and --wait enabled, will retry any failed check on resource state, except if HTTP status code < 500 is received, subject to the specified number of retries")
f.BoolVar(&client.Atomic, "atomic", false, "if set, upgrade process rolls back changes made in case of failed upgrade. The --wait flag will be set automatically if --atomic is used")
f.IntVar(&client.MaxHistory, "history-max", settings.MaxHistory, "limit the maximum number of revisions saved per release. Use 0 for no limit")
f.BoolVar(&client.CleanupOnFail, "cleanup-on-fail", false, "allow deletion of new resources created in this upgrade when upgrade fails")
Expand Down
20 changes: 14 additions & 6 deletions pkg/action/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Install struct {
Replace bool
Wait bool
WaitForJobs bool
WaitRetries int
Devel bool
DependencyUpdate bool
Timeout time.Duration
Expand Down Expand Up @@ -413,17 +414,24 @@ func (i *Install) performInstall(c chan<- resultMessage, rel *release.Release, t
}

if i.Wait {
var err error
if i.WaitForJobs {
if err := i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
if kubeClient, ok := i.cfg.KubeClient.(kube.InterfaceWithRetry); ok {
err = kubeClient.WaitWithJobsWithRetry(resources, i.Timeout, i.WaitRetries)
} else {
err = i.cfg.KubeClient.WaitWithJobs(resources, i.Timeout)
}
} else {
if err := i.cfg.KubeClient.Wait(resources, i.Timeout); err != nil {
i.reportToRun(c, rel, err)
return
if kubeClient, ok := i.cfg.KubeClient.(kube.InterfaceWithRetry); ok {
err = kubeClient.WaitWithRetry(resources, i.Timeout, i.WaitRetries)
} else {
err = i.cfg.KubeClient.Wait(resources, i.Timeout)
}
}
if err != nil {
i.reportToRun(c, rel, err)
return
}
}

if !i.DisableHooks {
Expand Down
16 changes: 16 additions & 0 deletions pkg/action/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,22 @@ func TestInstallRelease_Wait_Interrupted(t *testing.T) {
is.Contains(res.Info.Description, "Release \"interrupted-release\" failed: context canceled")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_Wait_With_Retries(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
instAction.ReleaseName = "come-fail-away"
failer := instAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
instAction.cfg.KubeClient = failer
instAction.Wait = true
instAction.WaitRetries = 2
vals := map[string]interface{}{}

res, err := instAction.Run(buildChart(), vals)
is.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestInstallRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
instAction := installAction(t)
Expand Down
24 changes: 16 additions & 8 deletions pkg/action/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type Upgrade struct {
Wait bool
// WaitForJobs determines whether the wait operation for the Jobs should be performed after the upgrade is requested.
WaitForJobs bool
// WaitRetries determines whether any failed resource state checks will be retried during the wait operation.
WaitRetries int
// DisableHooks disables hook processing if set to true.
DisableHooks bool
// DryRun controls whether the operation is prepared, but not executed.
Expand Down Expand Up @@ -402,19 +404,25 @@ func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *rele
u.cfg.Log(
"waiting for release %s resources (created: %d updated: %d deleted: %d)",
upgradedRelease.Name, len(results.Created), len(results.Updated), len(results.Deleted))
var err error
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceWithRetry); ok {
err = kubeClient.WaitWithJobsWithRetry(target, u.Timeout, u.WaitRetries)
} else {
err = u.cfg.KubeClient.WaitWithJobs(target, u.Timeout)
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
if kubeClient, ok := u.cfg.KubeClient.(kube.InterfaceWithRetry); ok {
err = kubeClient.WaitWithRetry(target, u.Timeout, u.WaitRetries)
} else {
err = u.cfg.KubeClient.Wait(target, u.Timeout)
}
}
if err != nil {
u.cfg.recordRelease(originalRelease)
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
}

// post-upgrade hooks
Expand Down
46 changes: 46 additions & 0 deletions pkg/action/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ func TestUpgradeRelease_Wait(t *testing.T) {
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}
func TestUpgradeRelease_Wait_With_Retries(t *testing.T) {
is := assert.New(t)
req := require.New(t)

upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "come-fail-away"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)

failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer
upAction.Wait = true
upAction.WaitRetries = 2
vals := map[string]interface{}{}

res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}

func TestUpgradeRelease_WaitForJobs(t *testing.T) {
is := assert.New(t)
Expand All @@ -112,6 +134,30 @@ func TestUpgradeRelease_WaitForJobs(t *testing.T) {
is.Equal(res.Info.Status, release.StatusFailed)
}

func TestUpgradeRelease_WaitForJobs_With_Retries(t *testing.T) {
is := assert.New(t)
req := require.New(t)

upAction := upgradeAction(t)
rel := releaseStub()
rel.Name = "come-fail-away"
rel.Info.Status = release.StatusDeployed
upAction.cfg.Releases.Create(rel)

failer := upAction.cfg.KubeClient.(*kubefake.FailingKubeClient)
failer.WaitError = fmt.Errorf("I timed out")
upAction.cfg.KubeClient = failer
upAction.Wait = true
upAction.WaitForJobs = true
upAction.WaitRetries = 2
vals := map[string]interface{}{}

res, err := upAction.Run(rel.Name, buildChart(), vals)
req.Error(err)
is.Contains(res.Info.Description, "I timed out")
is.Equal(res.Info.Status, release.StatusFailed)
}

func TestUpgradeRelease_CleanupOnFail(t *testing.T) {
is := assert.New(t)
req := require.New(t)
Expand Down
18 changes: 16 additions & 2 deletions pkg/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ func getResource(info *resource.Info) (runtime.Object, error) {

// Wait waits up to the given timeout for the specified resources to be ready.
func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
return c.WaitWithRetry(resources, timeout, 0)
}

// WaitWithRetry waits up to the given timeout for the specified resources to be ready. If an error
// is encountered when checking on the status of a resource then retries will be performed subject
// to the given maximum number of retries
func (c *Client) WaitWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error {
cs, err := c.getKubeClient()
if err != nil {
return err
Expand All @@ -285,11 +292,18 @@ func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(resources, waitRetries)
}

// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
return c.WaitWithJobsWithRetry(resources, timeout, 0)
}

// WaitWithJobsWithRetry waits up to the given timeout for the specified resources to be ready, including jobs.
// If an error is encountered when checking on the status of a resource then retries will be performed subject
// to the given maximum number of retries
func (c *Client) WaitWithJobsWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error {
cs, err := c.getKubeClient()
if err != nil {
return err
Expand All @@ -300,7 +314,7 @@ func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) err
log: c.Log,
timeout: timeout,
}
return w.waitForResources(resources)
return w.waitForResources(resources, waitRetries)
}

// WaitForDelete wait up to the given timeout for the specified resources to be deleted.
Expand Down
17 changes: 17 additions & 0 deletions pkg/kube/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func (f *FailingKubeClient) Wait(resources kube.ResourceList, d time.Duration) e
return f.PrintingKubeClient.Wait(resources, d)
}

// Waits the amount of time defined on f.WaitDuration, then returns the configured error if set or prints.
func (f *FailingKubeClient) WaitWithRetry(resources kube.ResourceList, d time.Duration, waitRetries int) error {
time.Sleep(f.WaitDuration)
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithRetry(resources, d, waitRetries)
}

// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
Expand All @@ -82,6 +91,14 @@ func (f *FailingKubeClient) WaitWithJobs(resources kube.ResourceList, d time.Dur
return f.PrintingKubeClient.WaitWithJobs(resources, d)
}

// WaitWithJobs returns the configured error if set or prints
func (f *FailingKubeClient) WaitWithJobsWithRetry(resources kube.ResourceList, d time.Duration, waitRetries int) error {
if f.WaitError != nil {
return f.WaitError
}
return f.PrintingKubeClient.WaitWithJobsWithRetry(resources, d, waitRetries)
}

// WaitForDelete returns the configured error if set or prints
func (f *FailingKubeClient) WaitForDelete(resources kube.ResourceList, d time.Duration) error {
if f.WaitError != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/kube/fake/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,21 @@ func (p *PrintingKubeClient) Wait(resources kube.ResourceList, _ time.Duration)
return err
}

func (p *PrintingKubeClient) WaitWithRetry(resources kube.ResourceList, _ time.Duration, _ int) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}

func (p *PrintingKubeClient) WaitWithJobs(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}

func (p *PrintingKubeClient) WaitWithJobsWithRetry(resources kube.ResourceList, _ time.Duration, _ int) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
}

func (p *PrintingKubeClient) WaitForDelete(resources kube.ResourceList, _ time.Duration) error {
_, err := io.Copy(p.Out, bufferize(resources))
return err
Expand Down
12 changes: 12 additions & 0 deletions pkg/kube/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,19 @@ type InterfaceResources interface {
BuildTable(reader io.Reader, validate bool) (ResourceList, error)
}

// InterfaceWithRetry is introduced to avoid breaking backwards compatibility for Interface implementers.
//
// TODO Helm 4: Remove InterfaceWithRetry and integrate its method(s) into the Interface.
type InterfaceWithRetry interface {
// Wait waits up to the given timeout for the specified resources to be ready.
WaitWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error

// WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
WaitWithJobsWithRetry(resources ResourceList, timeout time.Duration, waitRetries int) error
}

var _ Interface = (*Client)(nil)
var _ InterfaceExt = (*Client)(nil)
var _ InterfaceDeletionPropagation = (*Client)(nil)
var _ InterfaceResources = (*Client)(nil)
var _ InterfaceWithRetry = (*Client)(nil)
45 changes: 39 additions & 6 deletions pkg/kube/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/cli-runtime/pkg/resource"

"k8s.io/apimachinery/pkg/util/wait"
)
Expand All @@ -42,25 +43,57 @@ type waiter struct {
log func(string, ...interface{})
}

// waitForResources polls to get the current status of all pods, PVCs, Services and
// Jobs(optional) until all are ready or a timeout is reached
func (w *waiter) waitForResources(created ResourceList) error {
w.log("beginning wait for %d resources with timeout of %v", len(created), w.timeout)
// Jobs(optional) until all are ready or a timeout is reached.
// If an error is encountered when checking on the status of a resource then retries
// will be performed subject to the given maximum number of retries
func (w *waiter) waitForResources(created ResourceList, waitRetries int) error {
w.log("beginning wait for %d resources with timeout of %v, maximum retries %d", len(created), w.timeout, waitRetries)

ctx, cancel := context.WithTimeout(context.Background(), w.timeout)
defer cancel()

numberOfErrors := make([]int, len(created))
for i := range numberOfErrors {
numberOfErrors[i] = 0
}

return wait.PollUntilContextCancel(ctx, 2*time.Second, true, func(ctx context.Context) (bool, error) {
for _, v := range created {
for i, v := range created {
ready, err := w.c.IsReady(ctx, v)
if !ready || err != nil {

if waitRetries > 0 && w.isRetryableError(err, v) {
numberOfErrors[i]++
if numberOfErrors[i] > waitRetries {
w.log("Max number of retries reached")
return false, err
}
w.log("Retrying as current number of retries %d less than max number of retries %d", numberOfErrors[i]-1, waitRetries)
return false, nil
}
numberOfErrors[i] = 0
if !ready {
return false, err
}
}
return true, nil
})
}

func (w *waiter) isRetryableError(err error, resource *resource.Info) bool {
if err == nil {
return false
}
w.log("Error received when checking status of resource %s. Error: '%s', Resource details: '%s'", resource.Name, err, resource)
if ev, ok := err.(*apierrors.StatusError); ok {
statusCode := ev.Status().Code
retryable := statusCode >= 500
w.log("Status code received: %d. Retryable error? %t", statusCode, retryable)
return retryable
}
w.log("Retryable error? %t", true)
return true
}

// waitForDeletedResources polls to check if all the resources are deleted or a timeout is reached
func (w *waiter) waitForDeletedResources(deleted ResourceList) error {
w.log("beginning wait for %d resources to be deleted with timeout of %v", len(deleted), w.timeout)
Expand Down

0 comments on commit fc74964

Please sign in to comment.