From 6af1c937b1b92d911af303761f56e8f306477e9d Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 16 Mar 2023 21:35:52 +0800 Subject: [PATCH] test: should not leak goroutines after test finished The original flaky test shows in CI pipeline[1], but gotestsum run into a golang issue[2]. The error message is not clear from summary, like ``` {"Time":"2023-03-02T09:19:38.754394861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1433 +0x7e4\n"} {"Time":"2023-03-02T09:19:38.754414561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/runtime/panic.go:476 +0x32\n"} {"Time":"2023-03-02T09:19:38.754430561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"} {"Time":"2023-03-02T09:19:38.754482561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:883 +0xc4\n"} {"Time":"2023-03-02T09:19:38.754497661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:876 +0xa4\n"} {"Time":"2023-03-02T09:19:38.754512161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:927 +0x6a\n"} {"Time":"2023-03-02T09:19:38.754567661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zaptest.testingWriter.Write()\n"} {"Time":"2023-03-02T09:19:38.754571261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c\n"} {"Time":"2023-03-02T09:19:38.754582861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zaptest.(*testingWriter).Write()\n"} {"Time":"2023-03-02T09:19:38.754597761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zapcore.(*ioCore).Write()\n"} {"Time":"2023-03-02T09:19:38.754600961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199\n"} {"Time":"2023-03-02T09:19:38.754612761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zapcore.(*CheckedEntry).Write()\n"} {"Time":"2023-03-02T09:19:38.754618561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce\n"} {"Time":"2023-03-02T09:19:38.754630161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap.(*Logger).Info()\n"} {"Time":"2023-03-02T09:19:38.754633261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a\n"} {"Time":"2023-03-02T09:19:38.754644861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact()\n"} {"Time":"2023-03-02T09:19:38.754648461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/index.go:194 +0x144\n"} {"Time":"2023-03-02T09:19:38.754664961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction()\n"} {"Time":"2023-03-02T09:19:38.754670161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb\n"} {"Time":"2023-03-02T09:19:38.754681861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1()\n"} {"Time":"2023-03-02T09:19:38.754690561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:235 +0x9e\n"} {"Time":"2023-03-02T09:19:38.754720061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.job.Do()\n"} {"Time":"2023-03-02T09:19:38.754724161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:41 +0x70\n"} {"Time":"2023-03-02T09:19:38.754736161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*job).Do()\n"} {"Time":"2023-03-02T09:19:38.754750961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob()\n"} {"Time":"2023-03-02T09:19:38.754754161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:206 +0x101\n"} {"Time":"2023-03-02T09:19:38.754765861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run()\n"} {"Time":"2023-03-02T09:19:38.754769061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:187 +0x1a5\n"} {"Time":"2023-03-02T09:19:38.754780461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1()\n"} {"Time":"2023-03-02T09:19:38.754783661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x39\n"} {"Time":"2023-03-02T09:19:38.754824061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x75d\n"} FAIL: (code:1): % (cd server && 'env' 'ETCD_VERIFY=all' 'go' 'test' '-v' '-json' '-short' '-timeout=3m' '--race=true' '--cpu=4' './...' '-p=2') {"Time":"2023-03-02T09:19:38.754838961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1846 +0x99\n"} {"Time":"2023-03-02T09:19:38.754854961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"} {"Time":"2023-03-02T09:19:38.754893461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1844 +0x7ec\n"} {"Time":"2023-03-02T09:19:38.754908961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1726 +0xa84\n"} {"Time":"2023-03-02T09:19:38.754957861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler()\n"} {"Time":"2023-03-02T09:19:38.754961061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x3b6\n"} {"Time":"2023-03-02T09:19:38.754976161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.NewStore()\n"} {"Time":"2023-03-02T09:19:38.754979361Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:111 +0x331\n"} {"Time":"2023-03-02T09:19:38.754991061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValue()\n"} {"Time":"2023-03-02T09:19:38.754994261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/hash_test.go:36 +0xa4\n"} {"Time":"2023-03-02T09:19:38.755010061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"} {"Time":"2023-03-02T09:19:38.755024461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"} === Failed === FAIL: storage/mvcc (0.00s) === CONT testing.go:1319: race detected during execution of test FAIL FAIL go.etcd.io/etcd/server/v3/storage/mvcc 9.852s ``` After using the following command to reproduce it, we can get the error like: ```bash go test -v -p=2 --cpu=4 -count=1000 -failfast --race=true -short -timeout=30m ./ --- PASS: TestHashByRevValueLastRevision (0.12s) ================== WARNING: DATA RACE Read at 0x00c002024043 by goroutine 65745: testing.(*common).logDepth() /usr/lib/go-1.19/src/testing/testing.go:883 +0xc4 testing.(*common).log() /usr/lib/go-1.19/src/testing/testing.go:876 +0xa4 testing.(*common).Logf() /usr/lib/go-1.19/src/testing/testing.go:927 +0x6a testing.(*T).Logf() :1 +0x75 go.uber.org/zap/zaptest.testingWriter.Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c go.uber.org/zap/zaptest.(*testingWriter).Write() :1 +0x7e go.uber.org/zap/zapcore.(*ioCore).Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199 go.uber.org/zap/zapcore.(*CheckedEntry).Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce go.uber.org/zap.(*Logger).Info() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/index.go:194 +0x144 go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:235 +0x9e go.etcd.io/etcd/pkg/v3/schedule.job.Do() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:41 +0x70 go.etcd.io/etcd/pkg/v3/schedule.(*job).Do() :1 +0x29 go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:206 +0x101 go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:187 +0x1a5 go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x39 Previous write at 0x00c002024043 by goroutine 65743: testing.tRunner.func1() /usr/lib/go-1.19/src/testing/testing.go:1433 +0x7e4 runtime.deferreturn() /usr/lib/go-1.19/src/runtime/panic.go:476 +0x32 testing.(*T).Run.func1() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47 Goroutine 65745 (running) created at: go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x3b6 go.etcd.io/etcd/server/v3/storage/mvcc.NewStore() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:111 +0x331 go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValueLastRevision() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/hash_test.go:76 +0xa4 testing.tRunner() /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216 testing.(*T).Run.func1() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47 Goroutine 65743 (running) created at: testing.(*T).Run() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x75d testing.runTests.func1() /usr/lib/go-1.19/src/testing/testing.go:1846 +0x99 testing.tRunner() /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216 testing.runTests() /usr/lib/go-1.19/src/testing/testing.go:1844 +0x7ec testing.(*M).Run() /usr/lib/go-1.19/src/testing/testing.go:1726 +0xa84 main.main() _testmain.go:265 +0x2e9 ================== ``` The schedule for compact is handled asynchronously and it might use `t.Logf` after go-test marks the case is done. And there is a comment from go-test: ```go // https://github.com/golang/go/blob/c69ff3a7d0c8bd2878662034c1cbce8613fa6f13/src/testing/testing.go#LL1580C3-L1582C16 // Do not lock t.done to allow race detector to detect race in case // the user does not appropriately synchronize a goroutine. t.done = true ``` We need to ensure that all the goroutines should be closed before case finish. REF: [1]: https://github.com/etcd-io/etcd/actions/runs/4312405975/jobs/7522924734 [2]: https://github.com/gotestyourself/gotestsum/issues/310 Signed-off-by: Wei Fu --- server/storage/mvcc/hash_test.go | 9 ++- .../storage/mvcc/kvstore_compaction_test.go | 9 ++- server/storage/mvcc/kvstore_test.go | 7 +- server/storage/mvcc/watchable_store_test.go | 70 +++++++------------ 4 files changed, 45 insertions(+), 50 deletions(-) diff --git a/server/storage/mvcc/hash_test.go b/server/storage/mvcc/hash_test.go index 2c7a35f9a600..8d1623efa25f 100644 --- a/server/storage/mvcc/hash_test.go +++ b/server/storage/mvcc/hash_test.go @@ -32,8 +32,9 @@ import ( // output which would have catastrophic consequences. Expected output is just // hardcoded, so please regenerate it every time you change input parameters. func TestHashByRevValue(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -72,8 +73,9 @@ func TestHashByRevValue(t *testing.T) { } func TestHashByRevValueLastRevision(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -131,8 +133,9 @@ func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { // TestCompactionHash tests compaction hash // TODO: Change this to fuzz test func TestCompactionHash(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit) } diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index 2f8fac83c775..b9b7822739bf 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -110,7 +110,10 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer func() { + b.Close() + os.Remove(tmpPath) + }() s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) @@ -143,4 +146,8 @@ func TestCompactAllAndRestore(t *testing.T) { if err != nil { t.Errorf("unexpect range error %v", err) } + err = s1.Close() + if err != nil { + t.Fatal(err) + } } diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c755827ce68f..4399d61d8bc8 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -480,7 +480,7 @@ func TestRestoreDelete(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -527,9 +527,10 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } + // FIXME(fuweid): it doesn't test restore one? return } - + cleanup(s, b, tmpPath) t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) } } @@ -705,7 +706,7 @@ func TestTxnPut(t *testing.T) { func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) // write something to read later s.Put([]byte("foo"), []byte("bar"), lease.NoLease) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index a36c3ee1430d..b7d6868f84ba 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -17,7 +17,6 @@ package mvcc import ( "bytes" "fmt" - "os" "reflect" "sync" "testing" @@ -34,20 +33,16 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - w.Watch(0, testKey, nil, 0) + defer w.Close() + w.Watch(0, testKey, nil, 0) if !s.synced.contains(string(testKey)) { // the key must have had an entry in synced t.Errorf("existence = false, want true") @@ -57,18 +52,16 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, 0) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, 0) if err := w.Cancel(wt); err != nil { t.Error(err) } @@ -94,12 +87,10 @@ func TestCancelUnsynced(t *testing.T) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) // Put a key so that we can spawn watchers on that key. // (testKey in this test). This increases the rev to 1, @@ -110,6 +101,7 @@ func TestCancelUnsynced(t *testing.T) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -146,18 +138,17 @@ func TestSyncWatchers(t *testing.T) { store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -227,11 +218,8 @@ func TestSyncWatchers(t *testing.T) { func TestWatchCompacted(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") @@ -246,8 +234,9 @@ func TestWatchCompacted(t *testing.T) { } w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, compactRev-1) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, compactRev-1) select { case resp := <-w.Chan(): if resp.WatchID != wt { @@ -264,17 +253,14 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") w := s.NewWatchStream() + defer w.Close() + wrev := int64(10) w.Watch(0, testKey, nil, wrev) @@ -317,6 +303,8 @@ func TestWatchRestore(t *testing.T) { defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() + defer w.Close() + w.Watch(0, testKey, nil, rev-1) time.Sleep(delay) @@ -365,6 +353,8 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // create a watcher with a future revision // add to "synced" watcher group (startRev > s.store.currentRev) w1 := s1.NewWatchStream() + defer w1.Close() + w1.Watch(0, testKey, nil, startRev) // make "s2" ends up with a higher last revision @@ -407,8 +397,7 @@ func TestWatchBatchUnsynced(t *testing.T) { oldMaxRevs := watchBatchMaxRevs defer func() { watchBatchMaxRevs = oldMaxRevs - s.store.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) }() batches := 3 watchBatchMaxRevs = 4 @@ -419,6 +408,8 @@ func TestWatchBatchUnsynced(t *testing.T) { } w := s.NewWatchStream() + defer w.Close() + w.Watch(0, v, nil, 1) for i := 0; i < batches; i++ { if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { @@ -539,9 +530,7 @@ func TestWatchVictims(t *testing.T) { s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync }() @@ -616,12 +605,7 @@ func TestWatchVictims(t *testing.T) { func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey, testValue := []byte("foo"), []byte("bar") var wg sync.WaitGroup