diff --git a/storage/persistence/etcd/etcd.go b/storage/persistence/etcd/etcd.go index a73f136d..399e8316 100644 --- a/storage/persistence/etcd/etcd.go +++ b/storage/persistence/etcd/etcd.go @@ -68,6 +68,7 @@ type Etcd struct { isReady atomic.Bool quitCh chan struct{} + wg sync.WaitGroup electionCh chan *concurrency.Election leaderChangeCh chan bool } @@ -120,6 +121,7 @@ func New(id string, cfg *Config) (*Etcd, error) { leaderChangeCh: make(chan bool), } e.isReady.Store(false) + e.wg.Add(2) go e.electLoop(context.Background()) go e.observeLeaderEvent(context.Background()) return e, nil @@ -211,6 +213,7 @@ func (e *Etcd) List(ctx context.Context, prefix string) ([]persistence.Entry, er } func (e *Etcd) electLoop(ctx context.Context) { + defer e.wg.Done() for { select { case <-e.quitCh: @@ -249,6 +252,7 @@ func (e *Etcd) electLoop(ctx context.Context) { } func (e *Etcd) observeLeaderEvent(ctx context.Context) { + defer e.wg.Done() var election *concurrency.Election select { case elect := <-e.electionCh: @@ -287,5 +291,6 @@ func (e *Etcd) observeLeaderEvent(ctx context.Context) { func (e *Etcd) Close() error { close(e.quitCh) + e.wg.Wait() return e.client.Close() } diff --git a/storage/persistence/etcd/etcd_test.go b/storage/persistence/etcd/etcd_test.go index 61caee6b..e8e72946 100644 --- a/storage/persistence/etcd/etcd_test.go +++ b/storage/persistence/etcd/etcd_test.go @@ -40,6 +40,11 @@ func TestBasicOperations(t *testing.T) { }) require.NoError(t, err) defer persist.Close() + go func() { + for range persist.LeaderChange() { + // do nothing + } + }() ctx := context.Background() keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"} @@ -82,7 +87,6 @@ func TestElect(t *testing.T) { return node1.Leader() == node0.myID }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the node0") - shutdown := make(chan struct{}) go func() { for { select { @@ -90,8 +94,6 @@ func TestElect(t *testing.T) { // do nothing case <-node1.LeaderChange(): // do nothing - case <-shutdown: - return } } }() @@ -101,5 +103,4 @@ func TestElect(t *testing.T) { require.Eventuallyf(t, func() bool { return node1.Leader() == node1.myID }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader") - close(shutdown) }