Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

state store: better handling of job deletion #19609

Merged
merged 13 commits into from Jan 12, 2024
3 changes: 3 additions & 0 deletions .changelog/19609.txt
@@ -0,0 +1,3 @@
```release-note:bug
state: Fixed a bug where purged jobs would not get new deployments
```
4 changes: 1 addition & 3 deletions nomad/core_sched_test.go
Expand Up @@ -546,7 +546,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand Down Expand Up @@ -576,7 +575,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand All @@ -598,7 +596,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
// than that of the job).
// 3. The active job remains since it is active, even though the allocations are otherwise
// eligible for GC. However, the inactive allocation is GCed for it.
// 4. The eval and allocation for the purged job are GCed.
// 4. The eval and allocation for the purged job are deleted.
assertCorrectJobEvalAlloc(
memdb.NewWatchSet(),
[]*structs.Job{deadJob, activeJob, stoppedJob},
Expand Down
78 changes: 78 additions & 0 deletions nomad/state/state_store.go
Expand Up @@ -1894,6 +1894,84 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return err
}

// Delete job deployments
deployments, err := s.DeploymentsByJobID(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("deployment lookup for job %s failed: %v", job.ID, err)
}

for _, deployment := range deployments {
existing, err := txn.First("deployment", "id", deployment.ID)
if err != nil {
return fmt.Errorf("deployment lookup failed: %v", err)
}
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
if existing == nil {
return fmt.Errorf("deployment not found")
}
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Delete the deployment
if err := txn.Delete("deployment", existing); err != nil {
return fmt.Errorf("deployment delete failed: %v", err)
}
tgross marked this conversation as resolved.
Show resolved Hide resolved
if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

// Mark all "pending" evals for this job as "complete"
evals, err := s.EvalsByJob(nil, namespace, job.ID)
if err != nil {
return fmt.Errorf("eval lookup for job %s failed: %v", job.ID, err)
}

for _, eval := range evals {
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
return fmt.Errorf("eval lookup failed: %v", err)
}
if existing == nil {
continue
}

eval := existing.(*structs.Evaluation).Copy()
if eval.Status != structs.EvalStatusPending {
continue
}
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
eval.Status = structs.EvalStatusComplete
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

// Delete job allocs
allocs, err := s.AllocsByJob(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("alloc lookup for job %s failed: %v", job.ID, err)
}

for _, alloc := range allocs {
existing, err := txn.First("allocs", "id", alloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("alloc not found")
}
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved

// Delete the alloc
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("deployment delete failed: %v", err)
}
tgross marked this conversation as resolved.
Show resolved Hide resolved
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

// Cleanup plugins registered by this job, before we delete the summary
err = s.deleteJobFromPlugins(index, txn, job)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions nomad/state/state_store_test.go
Expand Up @@ -4277,27 +4277,6 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())

updateAllocsFn(allocIDs, SERVER,
func(alloc *structs.Allocation) {
alloc.DesiredStatus = structs.AllocDesiredStatusStop
})

updateAllocsFn(allocIDs, CLIENT,
func(alloc *structs.Allocation) {
alloc.ClientStatus = structs.AllocClientStatusComplete
})

plug = checkPlugin(pluginCounts{
controllerFingerprints: 1,
nodeFingerprints: 2,
controllersHealthy: 1,
nodesHealthy: 2,
controllersExpected: 0,
nodesExpected: 0,
})
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())
tgross marked this conversation as resolved.
Show resolved Hide resolved

for _, node := range nodes {
updateNodeFn(node.ID, func(node *structs.Node) {
node.CSIControllerPlugins = nil
Expand Down Expand Up @@ -7128,7 +7107,7 @@ func TestStateStore_AllocsForRegisteredJob(t *testing.T) {
t.Fatalf("err: %v", err)
}

expected := len(allocs) + len(allocs1)
expected := len(allocs1) // state.DeleteJob corresponds to stop -purge, so all allocs from the original job should be gone
if len(out) != expected {
t.Fatalf("expected: %v, actual: %v", expected, len(out))
}
Expand Down