Skip to content

Commit

Permalink
Fix some jobs TODOs
Browse files Browse the repository at this point in the history
Addresses a couple of TODOs in the jobs orchestrator.

Most notably, updates the reconcilers to set the DesiredState of Tasks
belonging to older job iterations to Remove, which will cause them to be
cleaned up and deleted.

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Jan 7, 2020
1 parent f61b9f7 commit 5367704
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 86 deletions.
7 changes: 4 additions & 3 deletions api/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions api/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1125,8 +1125,9 @@ message JobStatus {
Version job_iteration = 1 [(gogoproto.nullable) = false];

// LastExecution is the time that the job was last executed. This is set by
// the orchestrator in the same transaction that JobIteration is incremented,
// so that we can affirmatively determine which tasks were created as part of
// the last execution.
// the orchestrator in the same transaction that JobIteration is incremented.
// While time is a fungible concept in distributed systems like Swarmkit,
// this value gives us a best-effort attempt to prevent weird behavior like
// newly added nodes executing long-forgotten jobs.
google.protobuf.Timestamp last_execution = 2;
}
71 changes: 48 additions & 23 deletions manager/orchestrator/jobs/global/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,20 @@ func (r *Reconciler) ReconcileService(id string) error {
// earth-shattering, and the clock skew can't be _that_ bad because the PKI
// wouldn't work if it was. so this is just a best effort endeavor. we'll
// schedule to any node that says its creation time is before the
// LastExecution time.
//
// TODO(dperny): this only covers the case of nodes that were added after a
// global job is created. if nodes are paused or drained when a global job
// is started, then when they become un-paused or un-drained, the job will
// execute on them. i'm unsure if at this point it's better to accept and
// document this behavior, or spend rather a lot of time and energy coming
// up with a fix.
// LastExecution time. This prevents odd behavior like long-forgotten
// global jobs executing on newly added nodes. However, this does not
// prevent cases where older nodes were unavailable at job execution time
// and subsequently become available.
lastExecution, err := gogotypes.TimestampFromProto(service.JobStatus.LastExecution)
if err != nil {
// TODO(dperny): validate that lastExecution is set on service creation
// or update.
// LastExecution is always updated on service create or update.
// However, to guard against the worst case scenario, we can fall back
// onto the service meta CreatedAt value.
lastExecution, err = gogotypes.TimestampFromProto(service.Meta.CreatedAt)
if err != nil {
panic(fmt.Sprintf("service CreateAt time could not be parsed: %v", err))
// if we can't get this, however, then we might as well die,
// because it's a Very Big error.
panic(fmt.Sprintf("service CreatedAt time could not be parsed: %v", err))
}
}

Expand All @@ -138,7 +137,6 @@ func (r *Reconciler) ReconcileService(id string) error {

// if a node is invalid, we should remove any tasks that might be on it
if orchestrator.InvalidNode(node) {
fmt.Printf("node %v is invalid (availability: %v)\n", node.ID, node.Spec.Availability)
invalidNodes = append(invalidNodes, node.ID)
continue
}
Expand Down Expand Up @@ -168,20 +166,30 @@ func (r *Reconciler) ReconcileService(id string) error {
// additionally, while we're iterating through tasks, if any of those tasks
// are failed, we'll hand them to the restart supervisor to handle
restartTasks := []string{}
// and if there are any tasks belonging to old job iterations, set them to
// be removed
removeTasks := []string{}
for _, task := range tasks {
// match all tasks belonging to this job iteration which are in desired
// state completed, including failed tasks. We only want to create
// tasks for nodes on which there are no existing tasks.
if task.JobIteration != nil &&
task.JobIteration.Index == service.JobStatus.JobIteration.Index &&
task.DesiredState <= api.TaskStateCompleted {
// we already know the task is desired to be executing (because its
// desired state is Completed). Check here to see if it's already
// failed, so we can restart it
if task.Status.State > api.TaskStateCompleted {
restartTasks = append(restartTasks, task.ID)
if task.JobIteration != nil {
if task.JobIteration.Index == service.JobStatus.JobIteration.Index &&
task.DesiredState <= api.TaskStateCompleted {
// we already know the task is desired to be executing (because its
// desired state is Completed). Check here to see if it's already
// failed, so we can restart it
if task.Status.State > api.TaskStateCompleted {
restartTasks = append(restartTasks, task.ID)
}
nodeToTask[task.NodeID] = task.ID
}

if task.JobIteration.Index != service.JobStatus.JobIteration.Index {
if task.DesiredState != api.TaskStateRemove {
removeTasks = append(removeTasks, task.ID)
}
}
nodeToTask[task.NodeID] = task.ID
}
}

Expand Down Expand Up @@ -231,11 +239,28 @@ func (r *Reconciler) ReconcileService(id string) error {
}
}

// remove tasks that need to be removed
for _, taskID := range removeTasks {
if err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
return nil
}

if t.DesiredState == api.TaskStateRemove {
return nil
}

t.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, t)
}); err != nil {
return err
}
}

// finally, shut down any tasks on invalid nodes
for _, nodeID := range invalidNodes {
fmt.Printf("checking node %v for tasks", nodeID)
if taskID, ok := nodeToTask[nodeID]; ok {
fmt.Printf("node %v has task %v", nodeID, taskID)
if err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
Expand Down
48 changes: 48 additions & 0 deletions manager/orchestrator/jobs/global/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,54 @@ var _ = Describe("Global Job Reconciler", func() {
Expect(err).ToNot(HaveOccurred())
})

When("the service is updated", func() {
var allTasks []*api.Task

JustBeforeEach(func() {
// this JustBeforeEach will run after the one where the service
// etc is created and reconciled.
err := s.Update(func(tx store.Tx) error {
service := store.GetService(tx, serviceID)
service.JobStatus.JobIteration.Index++
service.Spec.Task.ForceUpdate++
return store.UpdateService(tx, service)
})
Expect(err).ToNot(HaveOccurred())

err = r.ReconcileService(serviceID)
Expect(err).ToNot(HaveOccurred())

s.View(func(tx store.ReadTx) {
allTasks, err = store.FindTasks(tx, store.ByServiceID(serviceID))
})
Expect(err).ToNot(HaveOccurred())
})

It("should remove tasks belonging to old iterations", func() {
count := 0
for _, task := range allTasks {
Expect(task.JobIteration).ToNot(BeNil())
if task.JobIteration.Index == 0 {
Expect(task.DesiredState).To(Equal(api.TaskStateRemove))
count++
}
}
Expect(count).To(Equal(len(nodes)))
})

It("should create new tasks with the new iteration", func() {
count := 0
for _, task := range allTasks {
Expect(task.JobIteration).ToNot(BeNil())
if task.JobIteration.Index == 1 {
Expect(task.DesiredState).To(Equal(api.TaskStateCompleted))
count++
}
}
Expect(count).To(Equal(len(nodes)))
})
})

When("there are failed tasks", func() {
BeforeEach(func() {
// all but the last node should be filled.
Expand Down
17 changes: 13 additions & 4 deletions manager/orchestrator/jobs/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/orchestrator"
"github.com/docker/swarmkit/manager/orchestrator/jobs/global"
"github.com/docker/swarmkit/manager/orchestrator/jobs/replicated"
Expand Down Expand Up @@ -77,6 +78,8 @@ func (o *Orchestrator) Run(ctx context.Context) {
// made inside of Run, and is enclosed in a sync.Once to stop this from being
// called multiple times
func (o *Orchestrator) run(ctx context.Context) {
ctx = log.WithModule(ctx, "orchestrator/jobs")

// closing doneChan should be the absolute last thing that happens in this
// method, and so should be the absolute first thing we defer.
defer close(o.doneChan)
Expand Down Expand Up @@ -132,13 +135,19 @@ func (o *Orchestrator) run(ctx context.Context) {

for _, service := range services {
if orchestrator.IsReplicatedJob(service) {
// TODO(dperny): do something with the error result of
// ReconcileService
o.replicatedReconciler.ReconcileService(service.ID)
if err := o.replicatedReconciler.ReconcileService(service.ID); err != nil {
log.G(ctx).WithField(
"service.id", service.ID,
).WithError(err).Error("error reconciling replicated job")
}
}

if orchestrator.IsGlobalJob(service) {
o.globalReconciler.ReconcileService(service.ID)
if err := o.globalReconciler.ReconcileService(service.ID); err != nil {
log.G(ctx).WithField(
"service.id", service.ID,
).WithError(err).Error("error reconciling global job")
}
}
}

Expand Down
66 changes: 46 additions & 20 deletions manager/orchestrator/jobs/replicated/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (r *Reconciler) ReconcileService(id string) error {
runningTasks := uint64(0)
completeTasks := uint64(0)
restartTasks := []string{}
removeTasks := []string{}

// for replicated jobs, each task will get a different slot number, so that
// when the job has completed, there will be one Completed task in every
Expand All @@ -116,27 +117,34 @@ func (r *Reconciler) ReconcileService(id string) error {
for _, task := range tasks {
// we only care about tasks from this job iteration. tasks from the
// previous job iteration are not important
// TODO(dperny): we need to stop any running tasks from older job
// iterations.
if task.JobIteration != nil && task.JobIteration.Index == jobVersion {
if task.Status.State == api.TaskStateCompleted {
completeTasks++
slots[task.Slot] = true
}
if task.JobIteration != nil {
if task.JobIteration.Index == jobVersion {
if task.Status.State == api.TaskStateCompleted {
completeTasks++
slots[task.Slot] = true
}

// the Restart Manager may put a task in the desired state Ready,
// so we should match not only tasks in desired state Completed,
// but also those in any valid running state.
if task.Status.State != api.TaskStateCompleted && task.DesiredState <= api.TaskStateCompleted {
runningTasks++
slots[task.Slot] = true

// if the task is in a terminal state, we might need to restart
// it. throw it on the pile if so. this is still counted as a
// running task for the purpose of determining how many new
// tasks to create.
if task.Status.State > api.TaskStateCompleted {
restartTasks = append(restartTasks, task.ID)
// the Restart Manager may put a task in the desired state Ready,
// so we should match not only tasks in desired state Completed,
// but also those in any valid running state.
if task.Status.State != api.TaskStateCompleted && task.DesiredState <= api.TaskStateCompleted {
runningTasks++
slots[task.Slot] = true

// if the task is in a terminal state, we might need to restart
// it. throw it on the pile if so. this is still counted as a
// running task for the purpose of determining how many new
// tasks to create.
if task.Status.State > api.TaskStateCompleted {
restartTasks = append(restartTasks, task.ID)
}
}
} else {
// tasks belonging to a previous iteration of the job may
// exist. if any such tasks exist, they should have their task
// state set to Remove
if task.Status.State <= api.TaskStateRunning && task.DesiredState != api.TaskStateRemove {
removeTasks = append(removeTasks, task.ID)
}
}
}
Expand Down Expand Up @@ -238,6 +246,24 @@ func (r *Reconciler) ReconcileService(id string) error {
}
}

for _, taskID := range removeTasks {
if err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
return nil
}

// don't do unnecessary updates
if t.DesiredState == api.TaskStateRemove {
return nil
}
t.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, t)
}); err != nil {
return err
}
}

return nil
})

Expand Down
Loading

0 comments on commit 5367704

Please sign in to comment.