Skip to content

Commit

Permalink
Add support for updating global jobs
Browse files Browse the repository at this point in the history
Adds support for updating global jobs by adding code to shut down tasks
belonging to previous job iterations.

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Jan 7, 2020
1 parent 6ad2213 commit 39596e5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
19 changes: 16 additions & 3 deletions manager/orchestrator/jobs/global/reconciler.go
Expand Up @@ -138,7 +138,6 @@ func (r *Reconciler) ReconcileService(id string) error {

// if a node is invalid, we should remove any tasks that might be on it
if orchestrator.InvalidNode(node) {
fmt.Printf("node %v is invalid (availability: %v)\n", node.ID, node.Spec.Availability)
invalidNodes = append(invalidNodes, node.ID)
continue
}
Expand Down Expand Up @@ -185,6 +184,21 @@ func (r *Reconciler) ReconcileService(id string) error {
}
}

// finally, we should identify any tasks belonging to the old iteration of
// the service and move those to desired state "shutdown"
shutdownTasks := []*api.Task{}
for _, task := range tasks {
// this is any task not belonging to this iteration and not already
// desired to shutdown. this also includes any successfully completed
// tasks, but to ensure maximum compatibility with the other
// orchestration components, we will still set the desired state of
// those to Shutdown.
if task.JobIteration.Index < service.JobStatus.JobIteration.Index &&
task.DesiredState != api.TaskStateShutdown {
shutdownTasks = append(shutdownTasks, task)
}
}

return r.store.Batch(func(batch *store.Batch) error {
// first, create any new tasks required.
for _, node := range candidateNodes {
Expand Down Expand Up @@ -233,9 +247,7 @@ func (r *Reconciler) ReconcileService(id string) error {

// finally, shut down any tasks on invalid nodes
for _, nodeID := range invalidNodes {
fmt.Printf("checking node %v for tasks", nodeID)
if taskID, ok := nodeToTask[nodeID]; ok {
fmt.Printf("node %v has task %v", nodeID, taskID)
if err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
Expand All @@ -253,6 +265,7 @@ func (r *Reconciler) ReconcileService(id string) error {
}
}
}

return nil
})
}
Expand Down
4 changes: 3 additions & 1 deletion manager/orchestrator/jobs/global/reconciler_test.go
Expand Up @@ -61,6 +61,9 @@ var _ = Describe("Global Job Reconciler", func() {
service = &api.Service{
ID: serviceID,
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "someService",
},
Mode: &api.ServiceSpec_GlobalJob{
// GlobalJob has no parameters
GlobalJob: &api.GlobalJob{},
Expand Down Expand Up @@ -425,7 +428,6 @@ var _ = Describe("Global Job Reconciler", func() {
})
})
})

})

Describe("FixTask", func() {
Expand Down

0 comments on commit 39596e5

Please sign in to comment.