Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

state store: better handling of job deletion #19609

Merged
merged 13 commits into from Jan 12, 2024
3 changes: 3 additions & 0 deletions .changelog/19609.txt
@@ -0,0 +1,3 @@
```release-note:bug
state: Fixed a bug where purged jobs would not get new deployments
```
4 changes: 1 addition & 3 deletions nomad/core_sched_test.go
Expand Up @@ -546,7 +546,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand Down Expand Up @@ -576,7 +575,6 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
Expand All @@ -598,7 +596,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
// than that of the job).
// 3. The active job remains since it is active, even though the allocations are otherwise
// eligible for GC. However, the inactive allocation is GCed for it.
// 4. The eval and allocation for the purged job are GCed.
// 4. The eval and allocation for the purged job are deleted.
assertCorrectJobEvalAlloc(
memdb.NewWatchSet(),
[]*structs.Job{deadJob, activeJob, stoppedJob},
Expand Down
120 changes: 118 additions & 2 deletions nomad/state/state_store.go
Expand Up @@ -806,6 +806,18 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
txn := s.db.WriteTxn(index)
defer txn.Abort()

err := s.DeleteDeploymentTxn(index, deploymentIDs, txn)
if err == nil {
return txn.Commit()
}

return err
}

// DeleteDeploymentTxn is used to delete a set of deployments by ID, like
// DeleteDeployment but in a transaction. Useful when making multiple
// modifications atomically.
func (s *StateStore) DeleteDeploymentTxn(index uint64, deploymentIDs []string, txn Txn) error {
if len(deploymentIDs) == 0 {
return nil
}
Expand All @@ -817,7 +829,7 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
return fmt.Errorf("deployment lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("deployment not found")
continue
}

// Delete the deployment
Expand All @@ -830,7 +842,50 @@ func (s *StateStore) DeleteDeployment(index uint64, deploymentIDs []string) erro
return fmt.Errorf("index update failed: %v", err)
}

return txn.Commit()
return nil
}

// DeleteAlloc is used to delete a set of allocations by ID
func (s *StateStore) DeleteAlloc(index uint64, allocIDs []string) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()

err := s.DeleteAllocTxn(index, allocIDs, txn)
if err == nil {
return txn.Commit()
}

return err
}

// DeleteAllocTxn is used to delete a set of allocs by ID, like DeleteALloc but
// in a transaction. Useful when making multiple modifications atomically.
func (s *StateStore) DeleteAllocTxn(index uint64, allocIDs []string, txn Txn) error {
if len(allocIDs) == 0 {
return nil
}

for _, allocID := range allocIDs {
// Lookup the alloc
existing, err := txn.First("allocs", "id", allocID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
if existing == nil {
continue
}

// Delete the alloc
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("alloc delete failed: %v", err)
}
}

if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}

// UpsertScalingEvent is used to insert a new scaling event.
Expand Down Expand Up @@ -1894,6 +1949,67 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return err
}

// Delete job deployments
deployments, err := s.DeploymentsByJobID(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("deployment lookup for job %s failed: %v", job.ID, err)
}

deploymentIDs := []string{}
for _, d := range deployments {
deploymentIDs = append(deploymentIDs, d.ID)
}

if err := s.DeleteDeploymentTxn(index, deploymentIDs, txn); err != nil {
return err
}

// Mark all "pending" evals for this job as "complete"
evals, err := s.EvalsByJob(nil, namespace, job.ID)
if err != nil {
return fmt.Errorf("eval lookup for job %s failed: %v", job.ID, err)
}

for _, eval := range evals {
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
return fmt.Errorf("eval lookup failed: %v", err)
}
if existing == nil {
continue
}

eval := existing.(*structs.Evaluation).Copy()
if eval.Status != structs.EvalStatusPending {
continue
}
pkazmierczak marked this conversation as resolved.
Show resolved Hide resolved
eval.Status = structs.EvalStatusComplete
lgfa29 marked this conversation as resolved.
Show resolved Hide resolved
eval.StatusDescription = fmt.Sprintf("job %s deleted", job.ID)

// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"evals", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}

// Delete job allocs
allocs, err := s.AllocsByJob(nil, namespace, job.ID, true)
if err != nil {
return fmt.Errorf("alloc lookup for job %s failed: %v", job.ID, err)
}

allocIDs := []string{}
for _, a := range allocs {
allocIDs = append(allocIDs, a.ID)
}

if err := s.DeleteAllocTxn(index, allocIDs, txn); err != nil {
return err
}

// Cleanup plugins registered by this job, before we delete the summary
err = s.deleteJobFromPlugins(index, txn, job)
if err != nil {
Expand Down
23 changes: 1 addition & 22 deletions nomad/state/state_store_test.go
Expand Up @@ -4277,27 +4277,6 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())

updateAllocsFn(allocIDs, SERVER,
func(alloc *structs.Allocation) {
alloc.DesiredStatus = structs.AllocDesiredStatusStop
})

updateAllocsFn(allocIDs, CLIENT,
func(alloc *structs.Allocation) {
alloc.ClientStatus = structs.AllocClientStatusComplete
})

plug = checkPlugin(pluginCounts{
controllerFingerprints: 1,
nodeFingerprints: 2,
controllersHealthy: 1,
nodesHealthy: 2,
controllersExpected: 0,
nodesExpected: 0,
})
require.True(t, plug.ControllerRequired)
require.False(t, plug.IsEmpty())
tgross marked this conversation as resolved.
Show resolved Hide resolved

for _, node := range nodes {
updateNodeFn(node.ID, func(node *structs.Node) {
node.CSIControllerPlugins = nil
Expand Down Expand Up @@ -7128,7 +7107,7 @@ func TestStateStore_AllocsForRegisteredJob(t *testing.T) {
t.Fatalf("err: %v", err)
}

expected := len(allocs) + len(allocs1)
expected := len(allocs1) // state.DeleteJob corresponds to stop -purge, so all allocs from the original job should be gone
if len(out) != expected {
t.Fatalf("expected: %v, actual: %v", expected, len(out))
}
Expand Down