From d7833eb2b0a3a35b2ac926b4495b2c3ac0dac580 Mon Sep 17 00:00:00 2001 From: Stephane Moser Date: Thu, 7 Jan 2021 22:11:48 +0000 Subject: [PATCH] Refactor logic Use mutex to lock the action to report the upstream function Wrap logic to report to upstream function in the function reportToPerformUpgrade Signed-off-by: Stephane Moser --- pkg/action/upgrade.go | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/pkg/action/upgrade.go b/pkg/action/upgrade.go index db5ae346e61..8ac0ee714b1 100644 --- a/pkg/action/upgrade.go +++ b/pkg/action/upgrade.go @@ -23,6 +23,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" @@ -39,6 +40,8 @@ import ( "helm.sh/helm/v3/pkg/storage/driver" ) +var mutex sync.Mutex + // Upgrade is the action for upgrading releases. // // It provides the implementation of 'helm upgrade'. @@ -105,7 +108,7 @@ type Upgrade struct { DependencyUpdate bool } -type ResultMessage struct { +type resultMessage struct { r *release.Release e error } @@ -314,7 +317,7 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea if err := u.cfg.Releases.Create(upgradedRelease); err != nil { return nil, err } - rChan := make(chan ResultMessage) + rChan := make(chan resultMessage) go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease) go u.handleSignals(rChan, upgradedRelease) result := <-rChan @@ -322,24 +325,32 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea return result.r, result.e } -func (u *Upgrade) handleSignals(c chan<- ResultMessage, upgradedRelease *release.Release) { +func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) { + mutex.Lock() + if err != nil { + rel, err = u.failRelease(rel, created, err) + } + c <- resultMessage{r: rel, e: err} + mutex.Unlock() +} +func (u *Upgrade) handleSignals(c chan<- resultMessage, upgradedRelease *release.Release) { // Handle SIGINT cSignal := make(chan os.Signal) signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM) go func() { <-cSignal u.cfg.Log("SIGTERM or SIGINT received") - r, e := u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed")) - c <- ResultMessage{r: r, e: e} + // when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens . I need to think in a way to lock the chanel + // Implement function reportToPerformUpgrade(channel, Release, ResourceList, error) if error != nill call u.failRelease + u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed")) }() } -func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { +func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) { // pre-upgrade hooks if !u.DisableHooks { if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil { - r, e := u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) - c <- ResultMessage{r: r, e: e} + u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err)) return } } else { @@ -349,8 +360,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele results, err := u.cfg.KubeClient.Update(current, target, u.Force) if err != nil { u.cfg.recordRelease(originalRelease) - r, e := u.failRelease(upgradedRelease, results.Created, err) - c <- ResultMessage{r: r, e: e} + u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return } @@ -368,15 +378,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele if u.WaitForJobs { if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - r, e := u.failRelease(upgradedRelease, results.Created, err) - c <- ResultMessage{r: r, e: e} + u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return } } else { if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil { u.cfg.recordRelease(originalRelease) - r, e := u.failRelease(upgradedRelease, results.Created, err) - c <- ResultMessage{r: r, e: e} + u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err) return } } @@ -385,8 +393,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele // post-upgrade hooks if !u.DisableHooks { if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil { - r, e := u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) - c <- ResultMessage{r: r, e: e} + u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err)) return } } @@ -400,7 +407,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele } else { upgradedRelease.Info.Description = "Upgrade complete" } - c <- ResultMessage{r: upgradedRelease, e: nil} + u.reportToPerformUpgrade(c, upgradedRelease, nil, nil) } func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {