Skip to content

Commit

Permalink
deploymentwatcher: fail early whenever possible (#17341)
Browse files Browse the repository at this point in the history
Given a deployment that has a `progress_deadline`, if a task group runs
out of reschedule attempts, allow it to fail at this time instead of
waiting until the `progress_deadline` is reached.

Fixes: #17260
  • Loading branch information
nicoche committed Jun 26, 2023
1 parent d20faf5 commit a9135bc
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .changelog/17341.txt
@@ -0,0 +1,3 @@
```release-note:improvement
deploymentwatcher: Allow deployments to fail early when running out of reschedule attempts
```
40 changes: 29 additions & 11 deletions nomad/deploymentwatcher/deployment_watcher.go
Expand Up @@ -615,12 +615,12 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
continue
}

// Determine if the update block for this group is progress based
progressBased := dstate.ProgressDeadline != 0
// Check if we can already fail the deployment
failDeployment := w.shouldFailEarly(deployment, alloc, dstate)

// Check if the allocation has failed and we need to mark it for allow
// replacements
if progressBased && alloc.DeploymentStatus.IsUnhealthy() &&
if alloc.DeploymentStatus.IsUnhealthy() && !failDeployment &&
deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
continue
Expand All @@ -631,19 +631,12 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
res.createEval = true
}

// If the group is using a progress deadline, we don't have to do anything.
if progressBased {
continue
}

// Fail on the first bad allocation
if alloc.DeploymentStatus.IsUnhealthy() {
if failDeployment {
// Check if the group has autorevert set
if dstate.AutoRevert {
res.rollback = true
}

// Since we have an unhealthy allocation, fail the deployment
res.failDeployment = true
}

Expand Down Expand Up @@ -702,6 +695,31 @@ func (w *deploymentWatcher) shouldFail() (fail, rollback bool, err error) {
return fail, false, nil
}

func (w *deploymentWatcher) shouldFailEarly(deployment *structs.Deployment, alloc *structs.AllocListStub, dstate *structs.DeploymentState) bool {
if !alloc.DeploymentStatus.IsUnhealthy() {
return false
}

// Fail on the first unhealthy allocation if no progress deadline is specified.
if dstate.ProgressDeadline == 0 {
w.logger.Debug("failing deployment because an allocation failed and the deployment is not progress based", "alloc", alloc.ID)
return true
}

if deployment.Active() {
reschedulePolicy := w.j.LookupTaskGroup(alloc.TaskGroup).ReschedulePolicy
isRescheduleEligible := alloc.RescheduleEligible(reschedulePolicy, time.Now())
if !isRescheduleEligible {
// We have run out of reschedule attempts: do not wait for the progress deadline to expire because
// we know that we will not be able to try to get another allocation healthy
w.logger.Debug("failing deployment because an allocation has failed and the task group has run out of reschedule attempts", "alloc", alloc.ID)
return true
}
}

return false
}

// getDeploymentProgressCutoff returns the progress cutoff for the given
// deployment
func (w *deploymentWatcher) getDeploymentProgressCutoff(d *structs.Deployment) time.Time {
Expand Down
85 changes: 85 additions & 0 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
mocker "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1784,6 +1785,90 @@ func TestDeploymentWatcher_Watch_StartWithoutProgressDeadline(t *testing.T) {
})
}

// Test that we exit before hitting the Progress Deadline when we run out of reschedule attempts
// for a failing deployment
func TestDeploymentWatcher_Watch_FailEarly(t *testing.T) {
ci.Parallel(t)
w, m := testDeploymentWatcher(t, 1000.0, 1*time.Millisecond)

// Create a job, alloc, and a deployment
j := mock.Job()
j.TaskGroups[0].Update = structs.DefaultUpdateStrategy.Copy()
j.TaskGroups[0].Update.MaxParallel = 2
j.TaskGroups[0].Update.ProgressDeadline = 500 * time.Millisecond
// Allow only 1 allocation for that deployment
j.TaskGroups[0].ReschedulePolicy.Attempts = 0
j.TaskGroups[0].ReschedulePolicy.Unlimited = false
j.Stable = true
d := mock.Deployment()
d.JobID = j.ID
d.TaskGroups["web"].ProgressDeadline = 500 * time.Millisecond
d.TaskGroups["web"].RequireProgressBy = time.Now().Add(d.TaskGroups["web"].ProgressDeadline)
a := mock.Alloc()
now := time.Now()
a.CreateTime = now.UnixNano()
a.ModifyTime = now.UnixNano()
a.DeploymentID = d.ID
must.Nil(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j), must.Sprint("UpsertJob"))
must.Nil(t, m.state.UpsertDeployment(m.nextIndex(), d), must.Sprint("UpsertDeployment"))
must.Nil(t, m.state.UpsertAllocs(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a}), must.Sprint("UpsertAllocs"))

// require that we get a call to UpsertDeploymentStatusUpdate
c := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusFailed,
StatusDescription: structs.DeploymentStatusDescriptionFailedAllocations,
Eval: true,
}
m2 := matchDeploymentStatusUpdateRequest(c)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(m2)).Return(nil)

w.SetEnabled(true, m.state)
testutil.WaitForResult(func() (bool, error) { return 1 == watchersCount(w), nil },
func(err error) { must.Eq(t, 1, watchersCount(w), must.Sprint("Should have 1 deployment")) })

// Update the alloc to be unhealthy
a2 := a.Copy()
a2.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Timestamp: now,
}
must.Nil(t, m.state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, m.nextIndex(), []*structs.Allocation{a2}))

// Wait for the deployment to be failed
testutil.WaitForResult(func() (bool, error) {
d, err := m.state.DeploymentByID(nil, d.ID)
if err != nil {
return false, err
}

if d.Status != structs.DeploymentStatusFailed {
return false, fmt.Errorf("bad status %q", d.Status)
}

return d.StatusDescription == structs.DeploymentStatusDescriptionFailedAllocations, fmt.Errorf("bad status description %q", d.StatusDescription)
}, func(err error) {
t.Fatal(err)
})

// require there are is only one evaluation
testutil.WaitForResult(func() (bool, error) {
ws := memdb.NewWatchSet()
evals, err := m.state.EvalsByJob(ws, j.Namespace, j.ID)
if err != nil {
return false, err
}

if l := len(evals); l != 1 {
return false, fmt.Errorf("Got %d evals; want 1", l)
}

return true, nil
}, func(err error) {
t.Fatal(err)
})
}

// Tests that the watcher fails rollback when the spec hasn't changed
func TestDeploymentWatcher_RollbackFailed(t *testing.T) {
ci.Parallel(t)
Expand Down
88 changes: 49 additions & 39 deletions nomad/structs/structs.go
Expand Up @@ -10164,6 +10164,46 @@ func (rt *RescheduleTracker) Copy() *RescheduleTracker {
return nt
}

func (rt *RescheduleTracker) RescheduleEligible(reschedulePolicy *ReschedulePolicy, failTime time.Time) bool {
if reschedulePolicy == nil {
return false
}
attempts := reschedulePolicy.Attempts
enabled := attempts > 0 || reschedulePolicy.Unlimited
if !enabled {
return false
}
if reschedulePolicy.Unlimited {
return true
}
// Early return true if there are no attempts yet and the number of allowed attempts is > 0
if (rt == nil || len(rt.Events) == 0) && attempts > 0 {
return true
}
attempted, _ := rt.rescheduleInfo(reschedulePolicy, failTime)
return attempted < attempts
}

func (rt *RescheduleTracker) rescheduleInfo(reschedulePolicy *ReschedulePolicy, failTime time.Time) (int, int) {
if reschedulePolicy == nil {
return 0, 0
}
attempts := reschedulePolicy.Attempts
interval := reschedulePolicy.Interval

attempted := 0
if rt != nil && attempts > 0 {
for j := len(rt.Events) - 1; j >= 0; j-- {
lastAttempt := rt.Events[j].RescheduleTime
timeDiff := failTime.UTC().UnixNano() - lastAttempt
if timeDiff < interval.Nanoseconds() {
attempted += 1
}
}
}
return attempted, attempts
}

// RescheduleEvent is used to keep track of previous attempts at rescheduling an allocation
type RescheduleEvent struct {
// RescheduleTime is the timestamp of a reschedule attempt
Expand Down Expand Up @@ -10598,47 +10638,11 @@ func (a *Allocation) ShouldReschedule(reschedulePolicy *ReschedulePolicy, failTi
// RescheduleEligible returns if the allocation is eligible to be rescheduled according
// to its ReschedulePolicy and the current state of its reschedule trackers
func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, failTime time.Time) bool {
if reschedulePolicy == nil {
return false
}
attempts := reschedulePolicy.Attempts
enabled := attempts > 0 || reschedulePolicy.Unlimited
if !enabled {
return false
}
if reschedulePolicy.Unlimited {
return true
}
// Early return true if there are no attempts yet and the number of allowed attempts is > 0
if (a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0) && attempts > 0 {
return true
}
attempted, _ := a.rescheduleInfo(reschedulePolicy, failTime)
return attempted < attempts
}

func (a *Allocation) rescheduleInfo(reschedulePolicy *ReschedulePolicy, failTime time.Time) (int, int) {
if reschedulePolicy == nil {
return 0, 0
}
attempts := reschedulePolicy.Attempts
interval := reschedulePolicy.Interval

attempted := 0
if a.RescheduleTracker != nil && attempts > 0 {
for j := len(a.RescheduleTracker.Events) - 1; j >= 0; j-- {
lastAttempt := a.RescheduleTracker.Events[j].RescheduleTime
timeDiff := failTime.UTC().UnixNano() - lastAttempt
if timeDiff < interval.Nanoseconds() {
attempted += 1
}
}
}
return attempted, attempts
return a.RescheduleTracker.RescheduleEligible(reschedulePolicy, failTime)
}

func (a *Allocation) RescheduleInfo() (int, int) {
return a.rescheduleInfo(a.ReschedulePolicy(), a.LastEventTime())
return a.RescheduleTracker.rescheduleInfo(a.ReschedulePolicy(), a.LastEventTime())
}

// LastEventTime is the time of the last task event in the allocation.
Expand Down Expand Up @@ -10696,7 +10700,7 @@ func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *Re
rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil)
if reschedulePolicy.Attempts > 0 && a.RescheduleTracker != nil && a.RescheduleTracker.Events != nil {
// Check for eligibility based on the interval if max attempts is set
attempted, attempts := a.rescheduleInfo(reschedulePolicy, failTime)
attempted, attempts := a.RescheduleTracker.rescheduleInfo(reschedulePolicy, failTime)
rescheduleEligible = attempted < attempts && nextDelay < reschedulePolicy.Interval
}
return nextRescheduleTime, rescheduleEligible
Expand Down Expand Up @@ -11174,6 +11178,12 @@ func (a *AllocListStub) SetEventDisplayMessages() {
setDisplayMsg(a.TaskStates)
}

// RescheduleEligible returns if the allocation is eligible to be rescheduled according
// to its ReschedulePolicy and the current state of its reschedule trackers
func (a *AllocListStub) RescheduleEligible(reschedulePolicy *ReschedulePolicy, failTime time.Time) bool {
return a.RescheduleTracker.RescheduleEligible(reschedulePolicy, failTime)
}

func setDisplayMsg(taskStates map[string]*TaskState) {
for _, taskState := range taskStates {
for _, event := range taskState.Events {
Expand Down

0 comments on commit a9135bc

Please sign in to comment.