Skip to content

Commit

Permalink
Support restarting failing jobs tasks
Browse files Browse the repository at this point in the history
In order to make the jobs reconcilers work correctly with the restart
supervisor, they have been altered to never replace failed tasks
directly Replacing failed tasks is the purview of the restart
supervisor. The jobs reconcilers will only create new tasks when needed.

Additionally, this alters the behavior of the replicated job reconciler
with regards to slots -- each new task will get a new slot, and when the
job is completed, there will be a Completed task in each slot from 0 to
TotalCompletions-1.

Then, makes the tweaks necessary for the Restart Supervisor to support
Jobs, which are different from other services in that they deliberately
have a desired state of Completed.

Finally, wires up the replicated and global orchestrators to call the
restart supervisor to restart tasks that have failed.

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Sep 30, 2019
1 parent 438cdc0 commit bfd10e1
Show file tree
Hide file tree
Showing 9 changed files with 792 additions and 117 deletions.
124 changes: 124 additions & 0 deletions manager/orchestrator/jobs/fakes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package jobs

import (
"context"
"sync"
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/orchestrator"
"github.com/docker/swarmkit/manager/orchestrator/restart"
"github.com/docker/swarmkit/manager/orchestrator/taskinit"
"github.com/docker/swarmkit/manager/state/store"
)

// fakes_test.go is just a file to hold all of the test fakes used with the
// orchestrator, so that they don't pollute the test files

// fakeReconciler implements the reconciler interface for testing the
// orchestrator.
type fakeReconciler struct {
sync.Mutex

// serviceErrors contains a mapping of ids to errors that should be
// returned if that ID is passed to reconcileService
serviceErrors map[string]error

// servicesReconciled is a list, in order, of all values this function has
// been called with, including those that would return errors.
servicesReconciled []string

// servicesRelated is a set of all service IDs of services passed to
// IsRelatedService
servicesRelated []string
}

// ReconcileService implements the reconciler's ReconcileService method, but
// just records what arguments it has been passed, and maybe also returns an
// error if desired.
func (f *fakeReconciler) ReconcileService(id string) error {
f.Lock()
defer f.Unlock()
f.servicesReconciled = append(f.servicesReconciled, id)
if err, ok := f.serviceErrors[id]; ok {
return err
}
return nil
}

func (f *fakeReconciler) getServicesReconciled() []string {
f.Lock()
defer f.Unlock()
// we can't just return the slice, because then we'd be accessing it
// outside of the protection of the mutex anyway. instead, we'll copy its
// contents. this is fine because this is only the tests, and the slice is
// almost certainly rather short.
returnSet := make([]string, len(f.servicesReconciled))
copy(returnSet, f.servicesReconciled)
return returnSet
}

func (f *fakeReconciler) getRelatedServices() []string {
f.Lock()
defer f.Unlock()

returnSet := make([]string, len(f.servicesRelated))
copy(returnSet, f.servicesRelated)
return returnSet
}

// finally, a few stubs to implement the InitHandler interface, just so types
// work out

func (f *fakeReconciler) IsRelatedService(s *api.Service) bool {
f.Lock()
defer f.Unlock()
if s != nil {
f.servicesRelated = append(f.servicesRelated, s.ID)
}
return true
}

func (f *fakeReconciler) FixTask(_ context.Context, _ *store.Batch, _ *api.Task) {}

func (f *fakeReconciler) SlotTuple(_ *api.Task) orchestrator.SlotTuple {
return orchestrator.SlotTuple{}
}

// fakeRestartSupervisor implements the restart.SupervisorInterface interface.
// All of its methods are currently stubs, as it exists mostly to ensure that
// a real restart.Supervisor is not instantiated in the unit tests.
type fakeRestartSupervisor struct{}

func (f *fakeRestartSupervisor) Restart(_ context.Context, _ store.Tx, _ *api.Cluster, _ *api.Service, _ api.Task) error {
return nil
}

func (f *fakeRestartSupervisor) UpdatableTasksInSlot(_ context.Context, _ orchestrator.Slot, _ *api.Service) orchestrator.Slot {
return orchestrator.Slot{}
}

func (f *fakeRestartSupervisor) RecordRestartHistory(_ orchestrator.SlotTuple, _ *api.Task) {}

func (f *fakeRestartSupervisor) DelayStart(_ context.Context, _ store.Tx, _ *api.Task, _ string, _ time.Duration, _ bool) <-chan struct{} {
return make(chan struct{})
}

func (f *fakeRestartSupervisor) StartNow(_ store.Tx, _ string) error {
return nil
}

func (f *fakeRestartSupervisor) Cancel(_ string) {}

func (f *fakeRestartSupervisor) CancelAll() {}

func (f *fakeRestartSupervisor) ClearServiceHistory(_ string) {}

// fakeCheckTasksFunc is a function to use as checkTasksFunc when unit testing
// the orchestrator. it will create a service with ID fakeCheckTasksFuncCalled
// and call ih.IsRelatedService with that service, allowing a roundabout way
// to ensure it's been called.
func fakeCheckTasksFunc(_ context.Context, _ *store.MemoryStore, _ store.ReadTx, ih taskinit.InitHandler, _ restart.SupervisorInterface) error {
ih.IsRelatedService(&api.Service{ID: "fakeCheckTasksFuncCalled"})
return nil
}
142 changes: 137 additions & 5 deletions manager/orchestrator/jobs/global/reconciler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package global

import (
"context"
"fmt"

"github.com/docker/swarmkit/api"
Expand All @@ -10,14 +11,28 @@ import (
gogotypes "github.com/gogo/protobuf/types"
)

// restartSupervisor is an interface representing the methods from the
// restart.SupervisorInterface that are actually needed by the reconciler. This
// more limited interface allows us to write a less ugly fake for unit testing.
type restartSupervisor interface {
Restart(context.Context, store.Tx, *api.Cluster, *api.Service, api.Task) error
}

// Reconciler is an object that manages reconciliation of global jobs. It is
// blocking and non-asynchronous, for ease of testing. It implements the
// Reconciler interface from the orchestrator package above it, and the
// taskinit.InitHandler interface.
type Reconciler struct {
store *store.MemoryStore

restart restartSupervisor
}

// NewReconciler creates a new global job reconciler.
func NewReconciler(store *store.MemoryStore) *Reconciler {
func NewReconciler(store *store.MemoryStore, restart restartSupervisor) *Reconciler {
return &Reconciler{
store: store,
store: store,
restart: restart,
}
}

Expand Down Expand Up @@ -112,13 +127,22 @@ func (r *Reconciler) ReconcileService(id string) error {
}

var candidateNodes []string
var invalidNodes []string
for _, node := range nodes {
// instead of having a big ugly multi-line boolean expression in the
// if-statement, we'll have several if-statements, and bail out of
// this loop iteration with continue if the node is not acceptable
if !constraint.NodeMatches(constraints, node) {
continue
}

// if a node is invalid, we should remove any tasks that might be on it
if orchestrator.InvalidNode(node) {
fmt.Printf("node %v is invalid (availability: %v)\n", node.ID, node.Spec.Availability)
invalidNodes = append(invalidNodes, node.ID)
continue
}

if node.Spec.Availability != api.NodeAvailabilityActive {
continue
}
Expand All @@ -141,18 +165,28 @@ func (r *Reconciler) ReconcileService(id string) error {
// ID mapping, so that we're just doing 2x linear operation, instead of a
// quadratic operation.
nodeToTask := map[string]string{}
// additionally, while we're iterating through tasks, if any of those tasks
// are failed, we'll hand them to the restart supervisor to handle
restartTasks := []string{}
for _, task := range tasks {
// only match tasks belonging to this job iteration, and running or
// completed, which are not desired to be shut down
// match all tasks belonging to this job iteration which are in desired
// state completed, including failed tasks. We only want to create
// tasks for nodes on which there are no existing tasks.
if task.JobIteration != nil &&
task.JobIteration.Index == service.JobStatus.JobIteration.Index &&
task.Status.State <= api.TaskStateCompleted &&
task.DesiredState <= api.TaskStateCompleted {
// we already know the task is desired to be executing (because its
// desired state is Completed). Check here to see if it's already
// failed, so we can restart it
if task.Status.State > api.TaskStateCompleted {
restartTasks = append(restartTasks, task.ID)
}
nodeToTask[task.NodeID] = task.ID
}
}

return r.store.Batch(func(batch *store.Batch) error {
// first, create any new tasks required.
for _, node := range candidateNodes {
// check if there is a task for this node ID. If not, then we need
// to create one.
Expand All @@ -169,6 +203,104 @@ func (r *Reconciler) ReconcileService(id string) error {
}
}
}

// then, restart any tasks that are failed
for _, taskID := range restartTasks {
if err := batch.Update(func(tx store.Tx) error {
// get the latest version of the task for the restart
t := store.GetTask(tx, taskID)
// if it's deleted, nothing to do
if t == nil {
return nil
}

// if it's not still desired to be running, then don't restart
// it.
if t.DesiredState > api.TaskStateCompleted {
return nil
}

// Finally, restart it
// TODO(dperny): pass in context to ReconcileService, so we can
// pass it in here.
return r.restart.Restart(context.Background(), tx, cluster, service, *t)
}); err != nil {
// TODO(dperny): probably should log like in the other
// orchestrators instead of returning here.
return err
}
}

// finally, shut down any tasks on invalid nodes
for _, nodeID := range invalidNodes {
fmt.Printf("checking node %v for tasks", nodeID)
if taskID, ok := nodeToTask[nodeID]; ok {
fmt.Printf("node %v has task %v", nodeID, taskID)
if err := batch.Update(func(tx store.Tx) error {
t := store.GetTask(tx, taskID)
if t == nil {
return nil
}
// if the task is still desired to be running, and is still
// actually, running, then it still needs to be shut down.
if t.DesiredState > api.TaskStateCompleted || t.Status.State <= api.TaskStateRunning {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}
return nil
}); err != nil {
return err
}
}
}
return nil
})
}

// IsRelatedService returns true if the task is a global job. This method
// fulfills the taskinit.InitHandler interface. Because it is just a wrapper
// around a well-tested function call, it has no tests of its own.
func (r *Reconciler) IsRelatedService(service *api.Service) bool {
return orchestrator.IsGlobalJob(service)
}

// FixTask validates that a task is compliant with the rest of the cluster
// state, and fixes it if it's not. This covers some main scenarios:
//
// * The node that the task is running on is now paused or drained. we do not
// need to check if the node still meets constraints -- that is the purview
// of the constraint enforcer.
// * The task has failed and needs to be restarted.
//
// This implements the FixTask method of the taskinit.InitHandler interface.
func (r *Reconciler) FixTask(ctx context.Context, batch *store.Batch, t *api.Task) {
// tasks already desired to be shut down need no action.
if t.DesiredState > api.TaskStateCompleted {
return
}

batch.Update(func(tx store.Tx) error {
node := store.GetNode(tx, t.NodeID)
// if the node is no longer a valid node for this task, we need to shut
// it down
if orchestrator.InvalidNode(node) {
task := store.GetTask(tx, t.ID)
if task != nil && task.DesiredState < api.TaskStateShutdown {
task.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, task)
}
}
// we will reconcile all services after fixing the tasks, so we don't
// need to restart tasks right now; we'll do so after this.
return nil
})
}

// SlotTuple returns a slot tuple representing this task. It implements the
// taskinit.InitHandler interface.
func (r *Reconciler) SlotTuple(t *api.Task) orchestrator.SlotTuple {
return orchestrator.SlotTuple{
ServiceID: t.ServiceID,
NodeID: t.NodeID,
}
}
Loading

0 comments on commit bfd10e1

Please sign in to comment.