Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

scheduler: Fix test races #2961

Merged
merged 2 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions controller/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *S) TestKillJob(c *C) {
State: ct.JobStateStarting,
Meta: map[string]string{"some": "info"},
})
hc := tu.NewFakeHostClient(hostID)
hc := tu.NewFakeHostClient(hostID, false)
hc.AddJob(&host.Job{ID: jobID})
s.cc.AddHost(hc)

Expand All @@ -153,7 +153,7 @@ func (s *S) TestRunJobDetached(c *C) {
app := s.createTestApp(c, &ct.App{Name: "run-detached"})
artifact := s.createTestArtifact(c, &ct.Artifact{Type: host.ArtifactTypeDocker, URI: "docker://foo/bar"})
hostID := fakeHostID()
host := tu.NewFakeHostClient(hostID)
host := tu.NewFakeHostClient(hostID, false)
s.cc.AddHost(host)

release := s.createTestRelease(c, &ct.Release{
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *S) TestRunJobDetached(c *C) {
func (s *S) TestRunJobAttached(c *C) {
app := s.createTestApp(c, &ct.App{Name: "run-attached"})
hostID := fakeHostID()
hc := tu.NewFakeHostClient(hostID)
hc := tu.NewFakeHostClient(hostID, false)
s.cc.AddHost(hc)

done := make(chan struct{})
Expand Down
7 changes: 7 additions & 0 deletions controller/scheduler/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/flynn/flynn/controller/testutils"
"github.com/flynn/flynn/controller/utils"
"github.com/flynn/flynn/host/types"
"github.com/flynn/flynn/pkg/stream"
Expand Down Expand Up @@ -88,6 +89,12 @@ func (h *Host) StreamEventsTo(ch chan *host.Event) (map[string]host.ActiveJob, e
break eventLoop
}
ch <- event

// if the host is a FakeHostClient with TestEventHook
// set, send on the channel to synchronize with tests
if h, ok := h.client.(*testutils.FakeHostClient); ok && h.TestEventHook != nil {
h.TestEventHook <- struct{}{}
}
case <-h.stop:
return
}
Expand Down
22 changes: 6 additions & 16 deletions controller/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func createTestScheduler(cluster utils.ClusterClient, discoverd Discoverd, l log

func newTestHosts() map[string]*FakeHostClient {
return map[string]*FakeHostClient{
testHostID: NewFakeHostClient(testHostID),
testHostID: NewFakeHostClient(testHostID, false),
}
}

Expand Down Expand Up @@ -222,7 +222,6 @@ func (TestSuite) TestFormationChange(c *C) {
// Test scaling up an existing formation
c.Log("Test scaling up an existing formation. Wait for formation change and job start")
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: map[string]int{"web": 4}})
s.waitFormationChange()
for i := 0; i < 3; i++ {
job := s.waitJobStart()
c.Assert(job.Type, Equals, testJobType)
Expand All @@ -234,7 +233,6 @@ func (TestSuite) TestFormationChange(c *C) {
// Test scaling down an existing formation
c.Log("Test scaling down an existing formation. Wait for formation change and job stop")
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: map[string]int{"web": 1}})
s.waitFormationChange()
for i := 0; i < 3; i++ {
s.waitJobStop()
}
Expand All @@ -249,9 +247,8 @@ func (TestSuite) TestFormationChange(c *C) {
s.CreateRelease(release)
c.Assert(len(s.formations), Equals, 1)
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: processes})
s.waitFormationChange()
c.Assert(len(s.formations), Equals, 2)
job := s.waitJobStart()
c.Assert(len(s.formations), Equals, 2)
c.Assert(job.Type, Equals, testJobType)
c.Assert(job.AppID, Equals, app.ID)
c.Assert(job.ReleaseID, Equals, release.ID)
Expand All @@ -262,7 +259,7 @@ func (TestSuite) TestRectify(c *C) {
defer s.Stop()

// wait for the formation to cascade to the scheduler
key := s.waitRectify()
key := utils.FormationKey{AppID: testAppID, ReleaseID: testReleaseID}
job := s.waitJobStart()
jobs := make(map[string]*Job)
jobs[job.JobID] = job
Expand All @@ -282,7 +279,6 @@ func (TestSuite) TestRectify(c *C) {

// Verify that the scheduler stops the extra job
c.Log("Verify that the scheduler stops the extra job")
s.waitRectify()
job = s.waitJobStop()
c.Assert(job.JobID, Equals, config.ID)
delete(jobs, job.JobID)
Expand Down Expand Up @@ -377,7 +373,7 @@ func (TestSuite) TestMultipleHosts(c *C) {
})

c.Log("Add a host to the cluster, then create a new app, artifact, release, and associated formation.")
host2 := NewFakeHostClient("host2")
host2 := NewFakeHostClient("host2", true)
fakeCluster.AddHost(host2)
hosts[host2.ID()] = host2
app := &ct.App{ID: "test-app-2", Name: "test-app-2"}
Expand All @@ -389,7 +385,6 @@ func (TestSuite) TestMultipleHosts(c *C) {
s.CreateArtifact(artifact)
s.CreateRelease(release)
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: processes})
s.waitFormationChange()
s.waitJobStart()
s.waitJobStart()
assertJobs(hostJobs{
Expand All @@ -402,7 +397,7 @@ func (TestSuite) TestMultipleHosts(c *C) {
},
})

host3 := NewFakeHostClient("host3")
host3 := NewFakeHostClient("host3", true)
c.Log("Add a host, wait for omni job start on that host.")
fakeCluster.AddHost(host3)
s.waitJobStart()
Expand All @@ -423,7 +418,6 @@ func (TestSuite) TestMultipleHosts(c *C) {
host3.CrashJob("job4")
s.waitJobStop()
s.waitJobStart()
s.waitRectify()
assertJobs(hostJobs{
host1: {
{Type: "web", state: JobStateStarting},
Expand Down Expand Up @@ -464,7 +458,7 @@ func (TestSuite) TestMultipleHosts(c *C) {

// resume the scheduler and check it moves the job back to host2
s.Resume()
s.waitRectify()
s.waitJobStart()
s.waitJobStart()
assertJobs(hostJobs{
host1: {
Expand Down Expand Up @@ -507,7 +501,6 @@ func (TestSuite) TestMultipleHosts(c *C) {
host1.Healthy = false
fakeCluster.RemoveHost(host1.ID())
s.waitFormationSync()
s.waitRectify()
s.waitJobStart()
assertJobs(hostJobs{
host1: {
Expand Down Expand Up @@ -766,7 +759,6 @@ func (TestSuite) TestScaleCriticalApp(c *C) {
s.CreateArtifact(artifact)
s.CreateRelease(release)
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: processes})
s.waitFormationChange()
s.waitJobStart()

// check we can't scale it down
Expand All @@ -779,11 +771,9 @@ func (TestSuite) TestScaleCriticalApp(c *C) {
newRelease := NewRelease("critical-release-2", artifact, processes)
s.CreateRelease(newRelease)
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: newRelease.ID, Processes: processes})
s.waitFormationChange()
s.waitJobStart()

// check we can now scale the original down
s.PutFormation(&ct.Formation{AppID: app.ID, ReleaseID: release.ID, Processes: nil})
s.waitFormationChange()
s.waitJobStop()
}
15 changes: 13 additions & 2 deletions controller/testutils/fake_host_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/flynn/flynn/pkg/stream"
)

func NewFakeHostClient(hostID string) *FakeHostClient {
return &FakeHostClient{
func NewFakeHostClient(hostID string, sync bool) *FakeHostClient {
h := &FakeHostClient{
hostID: hostID,
stopped: make(map[string]bool),
attach: make(map[string]attachFunc),
Expand All @@ -24,6 +24,10 @@ func NewFakeHostClient(hostID string) *FakeHostClient {
eventChannels: make(map[chan<- *host.Event]struct{}),
Healthy: true,
}
if sync {
h.TestEventHook = make(chan struct{})
}
return h
}

type FakeHostClient struct {
Expand All @@ -37,6 +41,7 @@ type FakeHostClient struct {
eventChannels map[chan<- *host.Event]struct{}
jobsMtx sync.RWMutex
Healthy bool
TestEventHook chan struct{}
}

func (c *FakeHostClient) ID() string { return c.hostID }
Expand Down Expand Up @@ -80,6 +85,9 @@ func (c *FakeHostClient) AddJob(job *host.Job) error {
JobID: job.ID,
Job: &j,
}
if c.TestEventHook != nil {
<-c.TestEventHook
}
}
return nil
}
Expand Down Expand Up @@ -126,6 +134,9 @@ func (c *FakeHostClient) stop(id string) error {
JobID: id,
Job: &job,
}
if c.TestEventHook != nil {
<-c.TestEventHook
}
}
return nil
}
Expand Down