Navigation Menu

Skip to content

Commit

Permalink
Refactor logic
Browse files Browse the repository at this point in the history
Use mutex to lock the action to report the upstream function
Wrap logic to report to upstream function in the function reportToPerformUpgrade

Signed-off-by: Stephane Moser <moser.sts@gmail.com>
  • Loading branch information
Moser-ss committed Jul 4, 2021
1 parent 027cea4 commit d7833eb
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions pkg/action/upgrade.go
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -39,6 +40,8 @@ import (
"helm.sh/helm/v3/pkg/storage/driver"
)

var mutex sync.Mutex

// Upgrade is the action for upgrading releases.
//
// It provides the implementation of 'helm upgrade'.
Expand Down Expand Up @@ -105,7 +108,7 @@ type Upgrade struct {
DependencyUpdate bool
}

type ResultMessage struct {
type resultMessage struct {
r *release.Release
e error
}
Expand Down Expand Up @@ -314,32 +317,40 @@ func (u *Upgrade) performUpgrade(originalRelease, upgradedRelease *release.Relea
if err := u.cfg.Releases.Create(upgradedRelease); err != nil {
return nil, err
}
rChan := make(chan ResultMessage)
rChan := make(chan resultMessage)
go u.releasingUpgrade(rChan, upgradedRelease, current, target, originalRelease)
go u.handleSignals(rChan, upgradedRelease)
result := <-rChan

return result.r, result.e
}

func (u *Upgrade) handleSignals(c chan<- ResultMessage, upgradedRelease *release.Release) {
func (u *Upgrade) reportToPerformUpgrade(c chan<- resultMessage, rel *release.Release, created kube.ResourceList, err error) {
mutex.Lock()
if err != nil {
rel, err = u.failRelease(rel, created, err)
}
c <- resultMessage{r: rel, e: err}
mutex.Unlock()
}
func (u *Upgrade) handleSignals(c chan<- resultMessage, upgradedRelease *release.Release) {
// Handle SIGINT
cSignal := make(chan os.Signal)
signal.Notify(cSignal, os.Interrupt, syscall.SIGTERM)
go func() {
<-cSignal
u.cfg.Log("SIGTERM or SIGINT received")
r, e := u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed"))
c <- ResultMessage{r: r, e: e}
// when the atomic flag is set the ongoing release finish first and doesn't give time for the rollback happens . I need to think in a way to lock the chanel
// Implement function reportToPerformUpgrade(channel, Release, ResourceList, error) if error != nill call u.failRelease
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("SIGTERM or SIGINT received, release failed"))
}()
}
func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
func (u *Upgrade) releasingUpgrade(c chan<- resultMessage, upgradedRelease *release.Release, current kube.ResourceList, target kube.ResourceList, originalRelease *release.Release) {
// pre-upgrade hooks

if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPreUpgrade, u.Timeout); err != nil {
r, e := u.failRelease(upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
c <- ResultMessage{r: r, e: e}
u.reportToPerformUpgrade(c, upgradedRelease, kube.ResourceList{}, fmt.Errorf("pre-upgrade hooks failed: %s", err))
return
}
} else {
Expand All @@ -349,8 +360,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele
results, err := u.cfg.KubeClient.Update(current, target, u.Force)
if err != nil {
u.cfg.recordRelease(originalRelease)
r, e := u.failRelease(upgradedRelease, results.Created, err)
c <- ResultMessage{r: r, e: e}
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}

Expand All @@ -368,15 +378,13 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele
if u.WaitForJobs {
if err := u.cfg.KubeClient.WaitWithJobs(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
r, e := u.failRelease(upgradedRelease, results.Created, err)
c <- ResultMessage{r: r, e: e}
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
} else {
if err := u.cfg.KubeClient.Wait(target, u.Timeout); err != nil {
u.cfg.recordRelease(originalRelease)
r, e := u.failRelease(upgradedRelease, results.Created, err)
c <- ResultMessage{r: r, e: e}
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, err)
return
}
}
Expand All @@ -385,8 +393,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele
// post-upgrade hooks
if !u.DisableHooks {
if err := u.cfg.execHook(upgradedRelease, release.HookPostUpgrade, u.Timeout); err != nil {
r, e := u.failRelease(upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
c <- ResultMessage{r: r, e: e}
u.reportToPerformUpgrade(c, upgradedRelease, results.Created, fmt.Errorf("post-upgrade hooks failed: %s", err))
return
}
}
Expand All @@ -400,7 +407,7 @@ func (u *Upgrade) releasingUpgrade(c chan<- ResultMessage, upgradedRelease *rele
} else {
upgradedRelease.Info.Description = "Upgrade complete"
}
c <- ResultMessage{r: upgradedRelease, e: nil}
u.reportToPerformUpgrade(c, upgradedRelease, nil, nil)
}

func (u *Upgrade) failRelease(rel *release.Release, created kube.ResourceList, err error) (*release.Release, error) {
Expand Down

0 comments on commit d7833eb

Please sign in to comment.