Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deployment watcher index usage #4709

Merged
merged 3 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 50 additions & 60 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ func (w *deploymentWatcher) SetAllocHealth(
if j != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version)
}
w.setLatestEval(index)
return nil
}

Expand Down Expand Up @@ -265,7 +264,6 @@ func (w *deploymentWatcher) PromoteDeployment(
resp.EvalCreateIndex = index
resp.DeploymentModifyIndex = index
resp.Index = index
w.setLatestEval(index)
return nil
}

Expand Down Expand Up @@ -297,7 +295,6 @@ func (w *deploymentWatcher) PauseDeployment(
}
resp.DeploymentModifyIndex = i
resp.Index = i
w.setLatestEval(i)
return nil
}

Expand Down Expand Up @@ -347,7 +344,6 @@ func (w *deploymentWatcher) FailDeployment(
if rollbackJob != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version)
}
w.setLatestEval(i)
return nil
}

Expand Down Expand Up @@ -490,10 +486,8 @@ FAIL:
// Update the status of the deployment to failed and create an evaluation.
e := w.getEval()
u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
w.logger.Error("failed to update deployment status", "error", err)
} else {
w.setLatestEval(index)
}
}

Expand All @@ -512,7 +506,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
var res allocUpdateResult

// Get the latest evaluation index
latestEval, err := w.latestEvalIndex()
latestEval, blocked, err := w.jobEvalStatus()
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return res, err
Expand All @@ -528,19 +522,20 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
continue
}

// Nothing to do for this allocation
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval {
continue
}

// Determine if the update stanza for this group is progress based
progressBased := dstate.ProgressDeadline != 0

// Check if the allocation has failed and we need to mark it for allow
// replacements
if progressBased && alloc.DeploymentStatus.IsUnhealthy() &&
deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
continue
}

// We need to create an eval so the job can progress.
if alloc.DeploymentStatus.IsHealthy() {
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
res.createEval = true
} else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
}

// If the group is using a progress deadline, we don't have to do anything.
Expand Down Expand Up @@ -685,10 +680,8 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI
w.l.Unlock()

// Create the eval
if index, err := w.createUpdate(replacements, w.getEval()); err != nil {
if _, err := w.createUpdate(replacements, w.getEval()); err != nil {
w.logger.Error("failed to create evaluation for deployment", "deployment_id", w.deploymentID, "error", err)
} else {
w.setLatestEval(index)
}
})
}
Expand Down Expand Up @@ -764,71 +757,68 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS
return nil, 0, err
}

maxIndex := uint64(0)
stubs := make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
stubs = append(stubs, alloc.Stub())

if maxIndex < alloc.ModifyIndex {
maxIndex = alloc.ModifyIndex
}
}

// Use the last index that affected the jobs table
index, err := state.Index("allocs")
if err != nil {
return nil, index, err
// Use the last index that affected the allocs table
if len(stubs) == 0 {
index, err := state.Index("allocs")
if err != nil {
return nil, index, err
}
maxIndex = index
}

return stubs, index, nil
return stubs, maxIndex, nil
}

// latestEvalIndex returns the index of the last evaluation created for
// the job. The index is used to determine if an allocation update requires an
// evaluation to be triggered.
func (w *deploymentWatcher) latestEvalIndex() (uint64, error) {
// jobEvalStatus returns the eval status for a job. It returns the index of the
// last evaluation created for the job, as well as whether there exists a
// blocked evaluation for the job. The index is used to determine if an
// allocation update requires an evaluation to be triggered. If there already is
// a blocked evaluations, no eval should be created.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return 0, err
return 0, false, err
}

snap, err := w.state.Snapshot()
if err != nil {
return 0, err
return 0, false, err
}

evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
if err != nil {
return 0, err
return 0, false, err
}

if len(evals) == 0 {
idx, err := snap.Index("evals")
if err != nil {
w.setLatestEval(idx)
}

return idx, err
index, err := snap.Index("evals")
return index, false, err
}

// Prefer using the snapshot index. Otherwise use the create index
e := evals[0]
if e.SnapshotIndex != 0 {
w.setLatestEval(e.SnapshotIndex)
return e.SnapshotIndex, nil
}

w.setLatestEval(e.CreateIndex)
return e.CreateIndex, nil
}
var max uint64
for _, eval := range evals {
// If we have a blocked eval, then we do not care what the index is
// since we will not need to make a new eval.
if eval.ShouldBlock() {
return 0, true, nil
}

// setLatestEval sets the given index as the latest eval unless the currently
// stored index is higher.
func (w *deploymentWatcher) setLatestEval(index uint64) {
w.l.Lock()
defer w.l.Unlock()
if index > w.latestEval {
w.latestEval = index
// Prefer using the snapshot index. Otherwise use the create index
if eval.SnapshotIndex != 0 && max < eval.SnapshotIndex {
max = eval.SnapshotIndex
} else if max < eval.CreateIndex {
max = eval.CreateIndex
}
}
}

// getLatestEval returns the latest eval index.
func (w *deploymentWatcher) getLatestEval() uint64 {
w.l.Lock()
defer w.l.Unlock()
return w.latestEval
return max, false, nil
}
12 changes: 12 additions & 0 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}

// Capture the allocs for each draining job.
var maxIndex uint64 = 0
resp := make(map[structs.NamespacedID][]*structs.Allocation, l)
for jns := range draining {
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
Expand All @@ -455,6 +456,17 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}

resp[jns] = allocs
for _, alloc := range allocs {
if maxIndex < alloc.ModifyIndex {
maxIndex = alloc.ModifyIndex
}
}
}

// Prefer using the actual max index of affected allocs since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
10 changes: 10 additions & 0 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
return nil, 0, err
}

var maxIndex uint64 = 0
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
Expand All @@ -244,6 +245,15 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto

node := raw.(*structs.Node)
resp[node.ID] = node
if maxIndex < node.ModifyIndex {
maxIndex = node.ModifyIndex
}
}

// Prefer using the actual max index of affected nodes since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
1 change: 1 addition & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl
copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy)
copy.DeploymentStatus.Timestamp = ts
copy.DeploymentStatus.ModifyIndex = index
copy.ModifyIndex = index

if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
Expand Down