Skip to content

Commit

Permalink
Merge pull request #150 from dansteen/fixed_batch_job_checker
Browse files Browse the repository at this point in the history
Fixed batch job checker
  • Loading branch information
jrasell committed Apr 25, 2018
2 parents f96a3ce + 9ba2772 commit 5854763
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 109 deletions.
112 changes: 29 additions & 83 deletions levant/job_status_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
// initially discovered as part of the deployment.
const initialTaskHealth = "unknown"

// TaskCoordinate is a coordinate for an allocation/task combination
type TaskCoordinate struct {
Alloc string
TaskName string
}

// jobStatusChecker checks the status of a job at least reaches a status of
// running. Depending on the type of job and its configuration it can go through
// more checks.
Expand Down Expand Up @@ -77,14 +83,11 @@ func (l *levantDeployment) simpleJobStatusChecker(jobID string) bool {
// jobs that do not support Nomad deployments.
func (l *levantDeployment) jobAllocationChecker(evalID *string) bool {

// Track if we experience any dead tasks in a dumb way.
var deadTasks int

j := l.config.Job.Name
q := &nomad.QueryOptions{WaitIndex: 1}

// Build our small internal checking struct.
levantTasks := l.buildAllocationChecker(evalID)
levantTasks := make(map[TaskCoordinate]string)

for {

Expand All @@ -104,106 +107,49 @@ func (l *levantDeployment) jobAllocationChecker(evalID *string) bool {
// If we get here, set the wi to the latest Index.
q.WaitIndex = meta.LastIndex

allocationStatusChecker(levantTasks, allocs, &deadTasks)
complete, deadTasks := allocationStatusChecker(levantTasks, allocs)

// depending on how we finished up we report our status
// If we have no allocations left to track then we can exit and log
// information depending on the success.
if len(levantTasks) == 0 && deadTasks == 0 {
if complete && deadTasks == 0 {
logging.Info("levant/job_status_checker: all allocations in deployment of job %s are running", *j)
return true
} else if len(levantTasks) == 0 && deadTasks > 0 {
} else if complete && deadTasks > 0 {
return false
}
}
}

func (l *levantDeployment) buildAllocationChecker(evalID *string) map[string]map[string]string {

// Create our map to track allocations and tasks within the evaluation created
// by the job registration.
levantTasks := make(map[string]map[string]string)

q := &nomad.QueryOptions{WaitIndex: 1}

// We use a for loop here as, during testing, I have observed Levant runs
// faster than Nomad and so without a blocking query a pure GET request can
// trigger before Nomad builds the allocation.
for {

// Pull the latest information abou the evaluation allocations from Nomad.
allocs, meta, err := l.nomad.Evaluations().Allocations(*evalID, q)
if err != nil {
logging.Error("levant/job_status_checker: unable to query evaluation for allocations: %v", err)
return nil
}

// If the LastIndex is not greater than our stored LastChangeIndex, we don't
// need to do anything.
if meta.LastIndex <= q.WaitIndex {
continue
}

// If we get here, set the wi to the latest Index.
q.WaitIndex = meta.LastIndex

// Iterate over each allocation which can contain multiple tasks in order to
// build our object of tasks to check.
for _, alloc := range allocs {
for taskName := range alloc.TaskStates {

// If we have not seen the allocation previously we need to init the map.
if levantTasks[alloc.ID] == nil {
levantTasks[alloc.ID] = make(map[string]string)
}

// Set the task health to our initial status of Unknown.
levantTasks[alloc.ID][taskName] = initialTaskHealth
}
}

if len(levantTasks) == 0 {
continue
}

return levantTasks
}
}

// allocationStatusChecker is used to check the state of allocations within a
// job deployment, an update Levants internal tracking on task status based on
// this. This functionality exists as Nomad does not currently support
// deployments across all job types.
func allocationStatusChecker(levantTasks map[string]map[string]string, allocs []*nomad.AllocationListStub, deadTask *int) {
func allocationStatusChecker(levantTasks map[TaskCoordinate]string, allocs []*nomad.AllocationListStub) (bool, int) {

complete := true
deadTasks := 0

for _, alloc := range allocs {
for taskName, task := range alloc.TaskStates {
levantTasks[alloc.ID][taskName] = task.State

// If the task is running, remove it from tracking.
switch levantTasks[alloc.ID][taskName] {
case nomadStructs.TaskStateRunning:
logging.Info("levant/job_status_checker: task %s in allocation %s has reached %s state",
taskName, alloc.ID, nomadStructs.TaskStateRunning)
delete(levantTasks[alloc.ID], taskName)
// if the state is one we haven't seen yet then we print a message
if levantTasks[TaskCoordinate{alloc.ID, taskName}] != task.State {
logging.Info("levant/job_status_checker: task %s in allocation %s now in %s state",
taskName, alloc.ID, task.State)
// then we record the new state
levantTasks[TaskCoordinate{alloc.ID, taskName}] = task.State
}

// then we have some case specific actions
switch levantTasks[TaskCoordinate{alloc.ID, taskName}] {
// if a task is still pendign we are not yet done
case nomadStructs.TaskStatePending:
logging.Debug("levant/job_status_checker: task %s in allocation %s now in %s state",
taskName, alloc.ID, nomadStructs.TaskStatePending)

// If the task is dead, incrament the deadTask counter and remove the task
// from tracking.
complete = false
// if the task is dead we record that
case nomadStructs.TaskStateDead:
logging.Error("levant/job_status_checker: task %s in allocation %s now in %s state",
taskName, alloc.ID, nomadStructs.TaskStateDead)
*deadTask++
delete(levantTasks[alloc.ID], taskName)
}

// If we have no tasks left under the allocation to track remove the
// allocation from our tracker.
if len(levantTasks[alloc.ID]) == 0 {
delete(levantTasks, alloc.ID)
deadTasks++
}
}
}
return complete, deadTasks
}
43 changes: 17 additions & 26 deletions levant/job_status_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,10 @@ import (

func TestJobStatusChecker_allocationStatusChecker(t *testing.T) {

// Setup our LevantTask structures.
levantTasks1 := make(map[string]map[string]string)
levantTasks1["10246d87-ecd7-21ad-13b2-f0c564647d64"] = make(map[string]string)
levantTasks1["10246d87-ecd7-21ad-13b2-f0c564647d64"]["task1"] = initialTaskHealth

levantTasks2 := make(map[string]map[string]string)
levantTasks2["20246d87-ecd7-21ad-13b2-f0c564647d64"] = make(map[string]string)
levantTasks2["20246d87-ecd7-21ad-13b2-f0c564647d64"]["task1"] = initialTaskHealth
levantTasks2["20246d87-ecd7-21ad-13b2-f0c564647d64"]["task2"] = initialTaskHealth

levantTasks3 := make(map[string]map[string]string)
levantTasks3["30246d87-ecd7-21ad-13b2-f0c564647d64"] = make(map[string]string)
levantTasks3["30246d87-ecd7-21ad-13b2-f0c564647d64"]["task1"] = initialTaskHealth
// Build our task status maps
levantTasks1 := make(map[TaskCoordinate]string)
levantTasks2 := make(map[TaskCoordinate]string)
levantTasks3 := make(map[TaskCoordinate]string)

// Build a small AllocationListStubs with required information.
var allocs1 []*nomad.AllocationListStub
Expand Down Expand Up @@ -50,43 +41,43 @@ func TestJobStatusChecker_allocationStatusChecker(t *testing.T) {
})

cases := []struct {
levantTasks map[string]map[string]string
allocs []*nomad.AllocationListStub
dead int
expectedDead int
expectedAllocs int
levantTasks map[TaskCoordinate]string
allocs []*nomad.AllocationListStub
dead int
expectedDead int
expectedComplete bool
}{
{
levantTasks1,
allocs1,
0,
0,
0,
true,
},
{
levantTasks2,
allocs2,
0,
0,
1,
false,
},
{
levantTasks3,
allocs3,
0,
1,
0,
true,
},
}

for _, tc := range cases {
allocationStatusChecker(tc.levantTasks, tc.allocs, &tc.dead)
complete, dead := allocationStatusChecker(tc.levantTasks, tc.allocs)

if len(tc.levantTasks) != tc.expectedAllocs {
t.Fatalf("expected %v but got %v", tc.expectedAllocs, len(tc.levantTasks))
if complete != tc.expectedComplete {
t.Fatalf("expected complete to be %v but got %v", tc.expectedComplete, complete)
}
if tc.dead != tc.expectedDead {
t.Fatalf("expected %v dead task(s) but got %v", tc.expectedDead, tc.dead)
if dead != tc.expectedDead {
t.Fatalf("expected %v dead task(s) but got %v", tc.expectedDead, dead)
}
}
}

0 comments on commit 5854763

Please sign in to comment.