Navigation Menu

Skip to content

Commit

Permalink
Add store event logic to replicated jobs orchestrator
Browse files Browse the repository at this point in the history
Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Sep 11, 2019
1 parent 31672b7 commit 170ebc5
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 58 deletions.
54 changes: 36 additions & 18 deletions manager/orchestrator/replicatedjob/orchestrator.go
Expand Up @@ -15,10 +15,6 @@ type Orchestrator struct {
// we need the store, of course, to do updates
store *store.MemoryStore

// a copy of the cluster is needed, because we need it when creating tasks
// to set the default log driver
cluster *api.Cluster

// reconciler holds the logic of actually operating on a service.
reconciler reconciler

Expand Down Expand Up @@ -53,27 +49,21 @@ func (o *Orchestrator) Run(ctx context.Context) {
services []*api.Service
)

o.store.View(func(tx store.ReadTx) {
watchChan, cancel, _ := store.ViewAndWatch(o.store, func(tx store.ReadTx) error {
// TODO(dperny): figure out what to do about the error return value
// from FindServices
services, _ = store.FindServices(tx, store.All)

// there should only ever be 1 cluster object, but for reasons
// forgotten by me, it needs to be retrieved in a rather roundabout way
// from the store
// TODO(dperny): figure out what to do with this error too
clusters, _ := store.FindClusters(tx, store.All)
if len(clusters) == 1 {
o.cluster = clusters[0]
}
return nil
})

defer cancel()

// for testing purposes, if a reconciler already exists on the
// orchestrator, we will not set it up. this allows injecting a fake
// reconciler.
if o.reconciler == nil {
// the cluster might be nil, but that doesn't matter.
o.reconciler = newReconciler(o.store, o.cluster)
o.reconciler = newReconciler(o.store)
}

for _, service := range services {
Expand All @@ -84,9 +74,37 @@ func (o *Orchestrator) Run(ctx context.Context) {
}
}

// TODO(dperny): this will be a case in the main select loop, but for now
// just block until stopChan is closed.
<-o.stopChan
for {
// first, before taking any action, see if we should stop the
// orchestrator. if both the stop channel and the watch channel are
// available to read, the channel that gets read is picked at random,
// but we always want to stop if it's possible.
select {
case <-o.stopChan:
return
default:
}

select {
case event := <-watchChan:
var service *api.Service

switch ev := event.(type) {
case api.EventCreateService:
service = ev.Service
case api.EventUpdateService:
service = ev.Service
}

if service != nil {
o.reconciler.ReconcileService(service.ID)
}
case <-o.stopChan:
// we also need to check for stop in here, in case there are no
// updates to cause the loop to turn over.
return
}
}
}

// Stop stops the Orchestrator
Expand Down
132 changes: 107 additions & 25 deletions manager/orchestrator/replicatedjob/orchestrator_test.go
Expand Up @@ -6,6 +6,7 @@ import (

"context"
"fmt"
"sync"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/orchestrator/testutils"
Expand All @@ -15,6 +16,8 @@ import (
// fakeReconciler implements the reconciler interface for testing the
// orchestrator.
type fakeReconciler struct {
sync.Mutex

// serviceErrors contains a mapping of ids to errors that should be
// returned if that ID is passed to reconcileService
serviceErrors map[string]error
Expand All @@ -28,13 +31,27 @@ type fakeReconciler struct {
// just records what arguments it has been passed, and maybe also returns an
// error if desired.
func (f *fakeReconciler) ReconcileService(id string) error {
f.Lock()
defer f.Unlock()
f.servicesReconciled = append(f.servicesReconciled, id)
if err, ok := f.serviceErrors[id]; ok {
return err
}
return nil
}

func (f *fakeReconciler) getServicesReconciled() []string {
f.Lock()
defer f.Unlock()
// we can't just return the slice, because then we'd be accessing it
// outside of the protection of the mutex anyway. instead, we'll copy its
// contents. this is fine because this is only the tests, and the slice is
// almost certainly rather short.
returnSet := make([]string, len(f.servicesReconciled))
copy(returnSet, f.servicesReconciled)
return returnSet
}

var _ = Describe("Replicated job orchestrator", func() {
var (
o *Orchestrator
Expand All @@ -52,11 +69,14 @@ var _ = Describe("Replicated job orchestrator", func() {
})

Describe("Starting and stopping", func() {
It("should stop when Stop is called", func(done Done) {
It("should stop when Stop is called", func() {
stopped := testutils.EnsureRuns(func() { o.Run(context.Background()) })
o.Stop()
Expect(stopped).To(BeClosed())
close(done)
// Eventually here will repeatedly run the matcher against the
// argument. This means that we will keep checking if stopped is
// closed until the test times out. Using Eventually instead of
// Expect ensure we can't race on "stopped".
Eventually(stopped).Should(BeClosed())
})
})

Expand Down Expand Up @@ -94,20 +114,7 @@ var _ = Describe("Replicated job orchestrator", func() {
},
},
}
if err := store.CreateService(tx, globalJob); err != nil {
return err
}

cluster := &api.Cluster{
ID: "someCluster",
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: "someName",
},
},
}

return store.CreateCluster(tx, cluster)
return store.CreateService(tx, globalJob)
})

Expect(err).ToNot(HaveOccurred())
Expand All @@ -125,13 +132,6 @@ var _ = Describe("Replicated job orchestrator", func() {
o.Stop()
})

It("should pick up the cluster object", func() {
// this is a white-box test which looks to see that o.cluster is
// set correctly.
Expect(o.cluster).ToNot(BeNil())
Expect(o.cluster.ID).To(Equal("someCluster"))
})

It("should reconcile each replicated job service that already exists", func() {
Expect(f.servicesReconciled).To(ConsistOf(
"service0", "service1", "service2",
Expand Down Expand Up @@ -167,7 +167,89 @@ var _ = Describe("Replicated job orchestrator", func() {
})

Describe("receiving events", func() {
It("should reconcile each time it receives an event", func() {
var stopped <-chan struct{}
BeforeEach(func() {
stopped = testutils.EnsureRuns(func() { o.Run(context.Background()) })
})

AfterEach(func() {
// If a test needs to stop early, that's no problem, because
// repeated calls to Stop have no effect.
o.Stop()
Eventually(stopped).Should(BeClosed())
})

It("should reconcile each replicated job service received", func() {
// Create some services. Wait a moment, and then check that they
// are reconciled.
err := s.Update(func(tx store.Tx) error {
for i := 0; i < 3; i++ {
service := &api.Service{
ID: fmt.Sprintf("service%v", i),
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: fmt.Sprintf("service%v", i),
},
Mode: &api.ServiceSpec_ReplicatedJob{
ReplicatedJob: &api.ReplicatedJob{},
},
},
}

if err := store.CreateService(tx, service); err != nil {
return err
}
}
return nil
})
Expect(err).ToNot(HaveOccurred())

Eventually(f.getServicesReconciled).Should(ConsistOf(
"service0", "service1", "service2",
))
})

It("should not reconcile anything after calling Stop", func() {
err := s.Update(func(tx store.Tx) error {
service := &api.Service{
ID: fmt.Sprintf("service0"),
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: fmt.Sprintf("service0"),
},
Mode: &api.ServiceSpec_ReplicatedJob{
ReplicatedJob: &api.ReplicatedJob{},
},
},
}

return store.CreateService(tx, service)
})

Expect(err).ToNot(HaveOccurred())

Eventually(f.getServicesReconciled).Should(ConsistOf("service0"))

o.Stop()

err = s.Update(func(tx store.Tx) error {
service := &api.Service{
ID: fmt.Sprintf("service1"),
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: fmt.Sprintf("service1"),
},
Mode: &api.ServiceSpec_ReplicatedJob{
ReplicatedJob: &api.ReplicatedJob{},
},
},
}

return store.CreateService(tx, service)
})

// service1 should never be reconciled.
Consistently(f.getServicesReconciled).Should(ConsistOf("service0"))
})
})
})
25 changes: 17 additions & 8 deletions manager/orchestrator/replicatedjob/reconciler.go
Expand Up @@ -19,17 +19,12 @@ type reconciler interface {
type reconcilerObj struct {
// we need the store, of course, to do updates
store *store.MemoryStore

// a copy of the cluster is needed, because we need it when creating tasks
// to set the default log driver
cluster *api.Cluster
}

// newReconciler creates a new reconciler object
func newReconciler(store *store.MemoryStore, cluster *api.Cluster) reconciler {
func newReconciler(store *store.MemoryStore) reconciler {
return &reconcilerObj{
store: store,
cluster: cluster,
store: store,
}
}

Expand All @@ -41,13 +36,27 @@ func (r *reconcilerObj) ReconcileService(id string) error {
var (
service *api.Service
tasks []*api.Task
cluster *api.Cluster
viewErr error
)
// first, get the service and all of its tasks
r.store.View(func(tx store.ReadTx) {
service = store.GetService(tx, id)

tasks, viewErr = store.FindTasks(tx, store.ByServiceID(id))

// there should only ever be 1 cluster object, but for reasons
// forgotten by me, it needs to be retrieved in a rather roundabout way
// from the store
var clusters []*api.Cluster
clusters, viewErr = store.FindClusters(tx, store.All)
if len(clusters) == 1 {
cluster = clusters[0]
} else if len(clusters) > 1 {
// this should never happen, and indicates that the system is
// broken.
panic("there should never be more than one cluster object")
}
})

// errors during view should only happen in a few rather catastrophic
Expand Down Expand Up @@ -181,7 +190,7 @@ func (r *reconcilerObj) ReconcileService(id string) error {
}
}

task := orchestrator.NewTask(r.cluster, service, slot, "")
task := orchestrator.NewTask(cluster, service, slot, "")
// when we create the task, we also need to set the
// JobIteration.
task.JobIteration = &api.Version{Index: jobVersion}
Expand Down

0 comments on commit 170ebc5

Please sign in to comment.