Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
It is likely that a large portion of test flakiness, especially in CI,
comes from the fact that swarmkit components under test are started in
goroutines, but those goroutines never have an opportunity to run. This
adds code ensuring those goroutines are scheduled and run, which should
hopefully solve many inexplicably flaky tests.

Additionally, increased test timeouts, to hopefully cover a few more
flaky cases.

Finally, removed direct use of the atomic package, in favor of less
efficient but higher-level mutexes.

Signed-off-by: Drew Erny <drew.erny@docker.com>
(cherry picked from commit 06a3566)
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
  • Loading branch information
dperny authored and thaJeztah committed Sep 25, 2019
1 parent 2d28e8f commit f5adf36
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 73 deletions.
36 changes: 18 additions & 18 deletions manager/orchestrator/global/global_test.go
Expand Up @@ -115,9 +115,9 @@ func setup(t *testing.T, store *store.MemoryStore, watch chan events.Event) *Orc
ctx := context.Background()
// Start the global orchestrator.
global := NewGlobalOrchestrator(store)
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, global.Run(ctx))
}()
})

addService(t, store, service1)
testutils.Expect(t, watch, api.EventCreateService{})
Expand Down Expand Up @@ -579,9 +579,9 @@ func TestInitializationRejectedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -642,9 +642,9 @@ func TestInitializationFailedTasks(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -734,9 +734,9 @@ func TestInitializationExtraTask(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.True(t, observedTask1.ID == "task1" || observedTask1.ID == "task2")
Expand Down Expand Up @@ -814,9 +814,9 @@ func TestInitializationMultipleServices(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Nothing should happen because both tasks are up to date.
select {
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestInitializationTaskWithoutService(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, observedTask1.ID, "task2")
Expand Down Expand Up @@ -1013,9 +1013,9 @@ func TestInitializationTaskOnDrainedNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1085,9 +1085,9 @@ func TestInitializationTaskOnNonexistentNode(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

observedTask1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, observedTask1.ID, "task1")
Expand Down Expand Up @@ -1254,9 +1254,9 @@ func TestInitializationRestartHistory(t *testing.T) {
orchestrator := NewGlobalOrchestrator(s)
defer orchestrator.Stop()

go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
})

// Fail the running task
s.Update(func(tx store.Tx) error {
Expand Down
72 changes: 38 additions & 34 deletions manager/orchestrator/replicated/update_test.go
@@ -1,7 +1,7 @@
package replicated

import (
"sync/atomic"
"sync"
"testing"
"time"

Expand All @@ -27,7 +27,7 @@ func TestUpdaterRollback(t *testing.T) {
}

func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_FailureAction, setMonitor bool, useSpecVersion bool) {
// this test should complete within 20 seconds. if not, bail out
// this test should complete within 30 seconds. if not, bail out
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -37,14 +37,15 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

orchestrator := NewReplicatedOrchestrator(s)

// TODO(dperny): these are used with atomic.StoreUint32 and
// atomic.LoadUint32. using atomic primitives is bad practice and easy to
// mess up
// These variables will be used to signal that The Fail Loop should start
// failing these tasks. Once they're closed, The Failing Can Begin.
var (
failImage1 uint32
failImage2 uint32
failMu sync.Mutex
failImage1 bool
)

// create a watch for task creates, which we will use to verify that the
// updater works correctly.
watchCreate, cancelCreate := state.Watch(s.WatchQueue(), api.EventCreateTask{})
defer cancelCreate()

Expand All @@ -54,8 +55,13 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
// Fail new tasks the updater tries to run
watchUpdate, cancelUpdate := state.Watch(s.WatchQueue(), api.EventUpdateTask{})
defer cancelUpdate()
go func() {

// We're gonna call this big chunk here "The Fail Loop". its job is to put
// tasks into a Failed state in certain conditions.
testutils.EnsureRuns(func() {
failedLast := false
// typical go pattern: infinite for loop in a goroutine, exits on
// ctx.Done
for {
var e events.Event
select {
Expand All @@ -67,15 +73,26 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
if task.DesiredState == task.Status.State {
continue
}
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed && task.Status.State != api.TaskStateRunning {
// This used to have a 3rd clause,
// "&& task.Status.State != api.TaskStateRunning"
// however, this is unneeded. If DesiredState is Running, then
// actual state cannot be Running, because that would get caught
// in the condition about (DesiredState == State)
if task.DesiredState == api.TaskStateRunning && task.Status.State != api.TaskStateFailed {
err := s.Update(func(tx store.Tx) error {
task = store.GetTask(tx, task.ID)
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
if task.Spec.GetContainer().Image == "image1" && atomic.LoadUint32(&failImage1) == 1 {
// lock mutex governing access to failImage1.
failMu.Lock()
defer failMu.Unlock()
// we should start failing tasks with image1 only after1
if task.Spec.GetContainer().Image == "image1" && failImage1 {
// only fail the task if we can read from failImage1
// (which will only be true if it's closed)
task.Status.State = api.TaskStateFailed
failedLast = true
} else if task.Spec.GetContainer().Image == "image2" && atomic.LoadUint32(&failImage2) == 1 && !failedLast {
} else if task.Spec.GetContainer().Image == "image2" && !failedLast {
// Never fail two image2 tasks in a row, so there's a mix of
// failed and successful tasks for the rollback.
task.Status.State = api.TaskStateFailed
failedLast = true
} else {
Expand All @@ -94,7 +111,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.NoError(t, err)
}
}
}()
})

// Create a service with four replicas specified before the orchestrator
// is started. This should result in two tasks when the orchestrator
Expand Down Expand Up @@ -153,23 +170,9 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa

// Start the orchestrator.
var orchestratorError error
orchestratorDone := make(chan struct{})
// verify that the orchestrator has had a chance to run by blocking the
// main test routine until it has.
orchestratorRan := make(chan struct{})
go func() {
close(orchestratorRan)
// try not to fail the test in go routines. it's racey. instead, save
// the error and check it in a defer
orchestratorDone := testutils.EnsureRuns(func() {
orchestratorError = orchestrator.Run(ctx)
close(orchestratorDone)
}()

select {
case <-orchestratorRan:
case <-ctx.Done():
t.Error("orchestrator did not start before test timed out")
}
})

defer func() {
orchestrator.Stop()
Expand All @@ -196,8 +199,6 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
assert.Equal(t, observedTask.Status.State, api.TaskStateNew)
assert.Equal(t, observedTask.Spec.GetContainer().Image, "image1")

atomic.StoreUint32(&failImage2, 1)

// Start a rolling update
err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
Expand Down Expand Up @@ -268,6 +269,7 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
var e events.Event
select {
case e = <-watchServiceUpdate:
t.Log("service was updated")
case <-ctx.Done():
t.Error("test timed out before watchServiceUpdate provided an event")
return
Expand All @@ -278,10 +280,12 @@ func testUpdaterRollback(t *testing.T, rollbackFailureAction api.UpdateConfig_Fa
}
}

atomic.StoreUint32(&failImage1, 1)

// Repeat the rolling update but this time fail the tasks that the
// rollback creates.
failMu.Lock()
failImage1 = true
failMu.Unlock()

err = s.Update(func(tx store.Tx) error {
s1 := store.GetService(tx, "id1")
require.NotNil(t, s1)
Expand Down
34 changes: 16 additions & 18 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Expand Up @@ -173,7 +173,7 @@ func TestTaskReaperInit(t *testing.T) {
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)
testutils.EnsureRuns(func() { reaper.Run(ctx) })

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
Expand Down Expand Up @@ -259,10 +259,10 @@ func TestTaskHistory(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -394,10 +394,8 @@ func TestTaskStateRemoveOnScaledown(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { assert.NoError(t, orchestrator.Run(ctx)) })
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -526,10 +524,10 @@ func TestTaskStateRemoveOnServiceRemoval(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, observedTask1.Status.State, api.TaskStateNew)
Expand Down Expand Up @@ -664,10 +662,10 @@ func TestServiceRemoveDeadTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator and the reaper.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down Expand Up @@ -843,7 +841,7 @@ func TestTaskReaperBatching(t *testing.T) {
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

// None of the tasks we've created are eligible for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
Expand Down Expand Up @@ -1010,10 +1008,10 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)

// Start the orchestrator.
go func() {
testutils.EnsureRuns(func() {
assert.NoError(t, orchestrator.Run(ctx))
}()
go taskReaper.Run(ctx)
})
testutils.EnsureRuns(func() { taskReaper.Run(ctx) })

observedTask1 := testutils.WatchTaskCreate(t, watch)
assert.Equal(t, api.TaskStateNew, observedTask1.Status.State)
Expand Down
20 changes: 18 additions & 2 deletions manager/orchestrator/testutils/testutils.go
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/stretchr/testify/assert"
)

// EnsureRuns takes a closure and runs it in a goroutine, blocking until the
// goroutine has had an opportunity to run. It returns a channel which will be
// closed when the provided closure exits.
func EnsureRuns(closure func()) <-chan struct{} {
started := make(chan struct{})
stopped := make(chan struct{})
go func() {
close(started)
closure()
close(stopped)
}()

<-started
return stopped
}

// WatchTaskCreate waits for a task to be created.
func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
for {
Expand All @@ -22,7 +38,7 @@ func WatchTaskCreate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventUpdateTask); ok {
assert.FailNow(t, "got EventUpdateTask when expecting EventCreateTask", fmt.Sprint(event))
}
case <-time.After(2 * time.Second):
case <-time.After(3 * time.Second):
assert.FailNow(t, "no task creation")
}
}
Expand All @@ -39,7 +55,7 @@ func WatchTaskUpdate(t *testing.T, watch chan events.Event) *api.Task {
if _, ok := event.(api.EventCreateTask); ok {
assert.FailNow(t, "got EventCreateTask when expecting EventUpdateTask", fmt.Sprint(event))
}
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
assert.FailNow(t, "no task update")
}
}
Expand Down

0 comments on commit f5adf36

Please sign in to comment.