Skip to content

Commit

Permalink
test: should not leak goroutines after test finished
Browse files Browse the repository at this point in the history
The original flaky test shows in CI pipeline[1], but gotestsum run into
a golang issue[2]. The error message is not clear from summary, like

```
{"Time":"2023-03-02T09:19:38.754394861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1433 +0x7e4\n"}
{"Time":"2023-03-02T09:19:38.754414561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/runtime/panic.go:476 +0x32\n"}
{"Time":"2023-03-02T09:19:38.754430561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"}
{"Time":"2023-03-02T09:19:38.754482561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:883 +0xc4\n"}
{"Time":"2023-03-02T09:19:38.754497661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:876 +0xa4\n"}
{"Time":"2023-03-02T09:19:38.754512161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:927 +0x6a\n"}
{"Time":"2023-03-02T09:19:38.754567661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.uber.org/zap/zaptest.testingWriter.Write()\n"}
{"Time":"2023-03-02T09:19:38.754571261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c\n"}
{"Time":"2023-03-02T09:19:38.754582861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.uber.org/zap/zaptest.(*testingWriter).Write()\n"}
{"Time":"2023-03-02T09:19:38.754597761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.uber.org/zap/zapcore.(*ioCore).Write()\n"}
{"Time":"2023-03-02T09:19:38.754600961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199\n"}
{"Time":"2023-03-02T09:19:38.754612761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.uber.org/zap/zapcore.(*CheckedEntry).Write()\n"}
{"Time":"2023-03-02T09:19:38.754618561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce\n"}
{"Time":"2023-03-02T09:19:38.754630161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.uber.org/zap.(*Logger).Info()\n"}
{"Time":"2023-03-02T09:19:38.754633261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a\n"}
{"Time":"2023-03-02T09:19:38.754644861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact()\n"}
{"Time":"2023-03-02T09:19:38.754648461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/server/storage/mvcc/index.go:194 +0x144\n"}
{"Time":"2023-03-02T09:19:38.754664961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction()\n"}
{"Time":"2023-03-02T09:19:38.754670161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb\n"}
{"Time":"2023-03-02T09:19:38.754681861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1()\n"}
{"Time":"2023-03-02T09:19:38.754690561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:235 +0x9e\n"}
{"Time":"2023-03-02T09:19:38.754720061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.job.Do()\n"}
{"Time":"2023-03-02T09:19:38.754724161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:41 +0x70\n"}
{"Time":"2023-03-02T09:19:38.754736161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.(*job).Do()\n"}
{"Time":"2023-03-02T09:19:38.754750961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob()\n"}
{"Time":"2023-03-02T09:19:38.754754161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:206 +0x101\n"}
{"Time":"2023-03-02T09:19:38.754765861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run()\n"}
{"Time":"2023-03-02T09:19:38.754769061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:187 +0x1a5\n"}
{"Time":"2023-03-02T09:19:38.754780461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1()\n"}
{"Time":"2023-03-02T09:19:38.754783661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x39\n"}
{"Time":"2023-03-02T09:19:38.754824061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x75d\n"}
FAIL: (code:1):
  % (cd server && 'env' 'ETCD_VERIFY=all' 'go' 'test' '-v' '-json' '-short' '-timeout=3m' '--race=true' '--cpu=4' './...' '-p=2')
{"Time":"2023-03-02T09:19:38.754838961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1846 +0x99\n"}
{"Time":"2023-03-02T09:19:38.754854961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"}
{"Time":"2023-03-02T09:19:38.754893461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1844 +0x7ec\n"}
{"Time":"2023-03-02T09:19:38.754908961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1726 +0xa84\n"}
{"Time":"2023-03-02T09:19:38.754957861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler()\n"}
{"Time":"2023-03-02T09:19:38.754961061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x3b6\n"}
{"Time":"2023-03-02T09:19:38.754976161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/server/v3/storage/mvcc.NewStore()\n"}
{"Time":"2023-03-02T09:19:38.754979361Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:111 +0x331\n"}
{"Time":"2023-03-02T09:19:38.754991061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"  go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValue()\n"}
{"Time":"2023-03-02T09:19:38.754994261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /home/runner/work/etcd/etcd/server/storage/mvcc/hash_test.go:36 +0xa4\n"}
{"Time":"2023-03-02T09:19:38.755010061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"}
{"Time":"2023-03-02T09:19:38.755024461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":"      /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"}

=== Failed
=== FAIL: storage/mvcc  (0.00s)
=== CONT
    testing.go:1319: race detected during execution of test
FAIL
FAIL	go.etcd.io/etcd/server/v3/storage/mvcc	9.852s
```

After using the following command to reproduce it, we can get the error
like:

```bash
go test -v -p=2 --cpu=4 -count=1000 -failfast --race=true -short -timeout=30m ./
--- PASS: TestHashByRevValueLastRevision (0.12s)
==================
WARNING: DATA RACE
Read at 0x00c002024043 by goroutine 65745:
  testing.(*common).logDepth()
      /usr/lib/go-1.19/src/testing/testing.go:883 +0xc4
  testing.(*common).log()
      /usr/lib/go-1.19/src/testing/testing.go:876 +0xa4
  testing.(*common).Logf()
      /usr/lib/go-1.19/src/testing/testing.go:927 +0x6a
  testing.(*T).Logf()
      <autogenerated>:1 +0x75
  go.uber.org/zap/zaptest.testingWriter.Write()
      /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c
  go.uber.org/zap/zaptest.(*testingWriter).Write()
      <autogenerated>:1 +0x7e
  go.uber.org/zap/zapcore.(*ioCore).Write()
      /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199
  go.uber.org/zap/zapcore.(*CheckedEntry).Write()
      /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce
  go.uber.org/zap.(*Logger).Info()
      /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a
  go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact()
      /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/index.go:194 +0x144
  go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction()
      /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb
  go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1()
      /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:235 +0x9e
  go.etcd.io/etcd/pkg/v3/schedule.job.Do()
      /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:41 +0x70
  go.etcd.io/etcd/pkg/v3/schedule.(*job).Do()
      <autogenerated>:1 +0x29
  go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob()
      /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:206 +0x101
  go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run()
      /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:187 +0x1a5
  go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1()
      /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x39

Previous write at 0x00c002024043 by goroutine 65743:
  testing.tRunner.func1()
      /usr/lib/go-1.19/src/testing/testing.go:1433 +0x7e4
  runtime.deferreturn()
      /usr/lib/go-1.19/src/runtime/panic.go:476 +0x32
  testing.(*T).Run.func1()
      /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47

Goroutine 65745 (running) created at:
  go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler()
      /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x3b6
  go.etcd.io/etcd/server/v3/storage/mvcc.NewStore()
      /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:111 +0x331
  go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValueLastRevision()
      /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/hash_test.go:76 +0xa4
  testing.tRunner()
      /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216
  testing.(*T).Run.func1()
      /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47

Goroutine 65743 (running) created at:
  testing.(*T).Run()
      /usr/lib/go-1.19/src/testing/testing.go:1493 +0x75d
  testing.runTests.func1()
      /usr/lib/go-1.19/src/testing/testing.go:1846 +0x99
  testing.tRunner()
      /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216
  testing.runTests()
      /usr/lib/go-1.19/src/testing/testing.go:1844 +0x7ec
  testing.(*M).Run()
      /usr/lib/go-1.19/src/testing/testing.go:1726 +0xa84
  main.main()
      _testmain.go:265 +0x2e9
==================
```

The schedule for compact is handled asynchronously and it might use
`t.Logf` after go-test marks the case is done. And there is a comment
from go-test:

```go
// https://github.com/golang/go/blob/c69ff3a7d0c8bd2878662034c1cbce8613fa6f13/src/testing/testing.go#LL1580C3-L1582C16
                // Do not lock t.done to allow race detector to detect race in case
		// the user does not appropriately synchronize a goroutine.
		t.done = true
```

We need to ensure that all the goroutines should be closed before case
finish.

REF:

[1]: https://github.com/etcd-io/etcd/actions/runs/4312405975/jobs/7522924734

[2]: gotestyourself/gotestsum#310

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Mar 18, 2023
1 parent 3717448 commit 6af1c93
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 50 deletions.
9 changes: 6 additions & 3 deletions server/storage/mvcc/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
// output which would have catastrophic consequences. Expected output is just
// hardcoded, so please regenerate it every time you change input parameters.
func TestHashByRevValue(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

var totalRevisions int64 = 1210
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
Expand Down Expand Up @@ -72,8 +73,9 @@ func TestHashByRevValue(t *testing.T) {
}

func TestHashByRevValueLastRevision(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

var totalRevisions int64 = 1210
assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions)
Expand Down Expand Up @@ -131,8 +133,9 @@ func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash {
// TestCompactionHash tests compaction hash
// TODO: Change this to fuzz test
func TestCompactionHash(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit)
}
Expand Down
9 changes: 8 additions & 1 deletion server/storage/mvcc/kvstore_compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func TestScheduleCompaction(t *testing.T) {
func TestCompactAllAndRestore(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer func() {
b.Close()
os.Remove(tmpPath)
}()

s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
Expand Down Expand Up @@ -143,4 +146,8 @@ func TestCompactAllAndRestore(t *testing.T) {
if err != nil {
t.Errorf("unexpect range error %v", err)
}
err = s1.Close()
if err != nil {
t.Fatal(err)
}
}
7 changes: 4 additions & 3 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func TestRestoreDelete(t *testing.T) {
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
tests := []string{"recreate", "restore"}
for _, test := range tests {
b, _ := betesting.NewDefaultTmpBackend(t)
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand Down Expand Up @@ -527,9 +527,10 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
time.Sleep(100 * time.Millisecond)
continue
}
// FIXME(fuweid): it doesn't test restore one?
return
}

cleanup(s, b, tmpPath)
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
}
}
Expand Down Expand Up @@ -705,7 +706,7 @@ func TestTxnPut(t *testing.T) {
func TestConcurrentReadNotBlockingWrite(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer os.Remove(tmpPath)
defer cleanup(s, b, tmpPath)

// write something to read later
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand Down
70 changes: 27 additions & 43 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package mvcc
import (
"bytes"
"fmt"
"os"
"reflect"
"sync"
"testing"
Expand All @@ -34,20 +33,16 @@ import (
func TestWatch(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
b.Close()
s.Close()
os.Remove(tmpPath)
}()
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
w.Watch(0, testKey, nil, 0)
defer w.Close()

w.Watch(0, testKey, nil, 0)
if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
t.Errorf("existence = false, want true")
Expand All @@ -57,18 +52,16 @@ func TestWatch(t *testing.T) {
func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
wt, _ := w.Watch(0, testKey, nil, 0)
defer w.Close()

wt, _ := w.Watch(0, testKey, nil, 0)
if err := w.Cancel(wt); err != nil {
t.Error(err)
}
Expand All @@ -94,12 +87,10 @@ func TestCancelUnsynced(t *testing.T) {
// to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test.
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
defer cleanup(s, b, tmpPath)

// Put a key so that we can spawn watchers on that key.
// (testKey in this test). This increases the rev to 1,
Expand All @@ -110,6 +101,7 @@ func TestCancelUnsynced(t *testing.T) {
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
defer w.Close()

// arbitrary number for watchers
watcherN := 100
Expand Down Expand Up @@ -146,18 +138,17 @@ func TestSyncWatchers(t *testing.T) {
store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)

w := s.NewWatchStream()
defer w.Close()

// arbitrary number for watchers
watcherN := 100
Expand Down Expand Up @@ -227,11 +218,8 @@ func TestSyncWatchers(t *testing.T) {
func TestWatchCompacted(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b, tmpPath)

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")

Expand All @@ -246,8 +234,9 @@ func TestWatchCompacted(t *testing.T) {
}

w := s.NewWatchStream()
wt, _ := w.Watch(0, testKey, nil, compactRev-1)
defer w.Close()

wt, _ := w.Watch(0, testKey, nil, compactRev-1)
select {
case resp := <-w.Chan():
if resp.WatchID != wt {
Expand All @@ -264,17 +253,14 @@ func TestWatchCompacted(t *testing.T) {
func TestWatchFutureRev(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
b.Close()
s.Close()
os.Remove(tmpPath)
}()
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")

w := s.NewWatchStream()
defer w.Close()

wrev := int64(10)
w.Watch(0, testKey, nil, wrev)

Expand Down Expand Up @@ -317,6 +303,8 @@ func TestWatchRestore(t *testing.T) {
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
defer w.Close()

w.Watch(0, testKey, nil, rev-1)

time.Sleep(delay)
Expand Down Expand Up @@ -365,6 +353,8 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
// create a watcher with a future revision
// add to "synced" watcher group (startRev > s.store.currentRev)
w1 := s1.NewWatchStream()
defer w1.Close()

w1.Watch(0, testKey, nil, startRev)

// make "s2" ends up with a higher last revision
Expand Down Expand Up @@ -407,8 +397,7 @@ func TestWatchBatchUnsynced(t *testing.T) {
oldMaxRevs := watchBatchMaxRevs
defer func() {
watchBatchMaxRevs = oldMaxRevs
s.store.Close()
os.Remove(tmpPath)
cleanup(s, b, tmpPath)
}()
batches := 3
watchBatchMaxRevs = 4
Expand All @@ -419,6 +408,8 @@ func TestWatchBatchUnsynced(t *testing.T) {
}

w := s.NewWatchStream()
defer w.Close()

w.Watch(0, v, nil, 1)
for i := 0; i < batches; i++ {
if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
Expand Down Expand Up @@ -539,9 +530,7 @@ func TestWatchVictims(t *testing.T) {
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
b.Close()
s.Close()
os.Remove(tmpPath)
cleanup(s, b, tmpPath)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

Expand Down Expand Up @@ -616,12 +605,7 @@ func TestWatchVictims(t *testing.T) {
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
b.Close()
s.Close()
os.Remove(tmpPath)
}()
defer cleanup(s, b, tmpPath)

testKey, testValue := []byte("foo"), []byte("bar")
var wg sync.WaitGroup
Expand Down

0 comments on commit 6af1c93

Please sign in to comment.