Skip to content

Commit

Permalink
tests: fix TestRaft_SnapshotRestore_PeerChange(#392)
Browse files Browse the repository at this point in the history
* This test was blocked on writing back the response into the channel from
the RPC by a follower. Fixed by making it a buffered channel of size 1
and by changing the future to react to raft shutdown.
* add GOTRACEBACK to test dependency.
  • Loading branch information
hanshasselberg committed Mar 3, 2020
1 parent b78b39c commit b47bafa
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 26 deletions.
24 changes: 13 additions & 11 deletions .circleci/config.yml
Expand Up @@ -2,18 +2,19 @@ version: 2.1

references:
images:
go-1.11: &GOLANG_1_11_IMAGE circleci/golang:1.11
go-1.12: &GOLANG_1_12_IMAGE circleci/golang:1.12
go-1.13: &GOLANG_1_13_IMAGE circleci/golang:1.13
go-1.14: &GOLANG_1_14_IMAGE circleci/golang:1.14

environment: &ENVIRONMENT
TEST_RESULTS_DIR: &TEST_RESULTS_DIR /tmp/test-results # path to where test results are saved
GOTRACEBACK: 'all'
GO111MODULE: 'on'
GOMAXPROCS: 2

jobs:
go-fmt-and-vet:
docker:
- image: *GOLANG_1_12_IMAGE
- image: *GOLANG_1_14_IMAGE
steps:
- checkout

Expand Down Expand Up @@ -100,23 +101,24 @@ workflows:
jobs:
- go-fmt-and-vet
- go-test:
name: test go1.11
go-version: *GOLANG_1_11_IMAGE
name: test go1.13
go-version: *GOLANG_1_13_IMAGE
requires:
- go-fmt-and-vet
- go-test:
name: test go1.12
go-version: *GOLANG_1_12_IMAGE
name: test go1.14
go-version: *GOLANG_1_14_IMAGE
requires:
- go-fmt-and-vet
- go-test-32bit:
name: test go1.11 - 32bit
go-version: *GOLANG_1_11_IMAGE
name: test go1.13 - 32bit
go-version: *GOLANG_1_13_IMAGE
requires:
- go-fmt-and-vet
- go-test-32bit:
name: test go1.12 - 32 bit
go-version: *GOLANG_1_12_IMAGE
name: test go1.14 - 32bit
go-version: *GOLANG_1_14_IMAGE
requires:
- go-fmt-and-vet


4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -16,8 +16,8 @@ endif
TEST_RESULTS_DIR?=/tmp/test-results

test:
go test $(TESTARGS) -timeout=60s -race .
go test $(TESTARGS) -timeout=60s -tags batchtest -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -race .
GOTRACEBACK=all go test $(TESTARGS) -timeout=60s -tags batchtest -race .

integ: test
INTEG_TESTS=yes go test $(TESTARGS) -timeout=25s -run=Integ .
Expand Down
2 changes: 1 addition & 1 deletion inmem_transport.go
Expand Up @@ -159,7 +159,7 @@ func (i *InmemTransport) makeRPC(target ServerAddress, args interface{}, r io.Re
}

// Send the RPC over
respCh := make(chan RPCResponse)
respCh := make(chan RPCResponse, 1)
req := RPC{
Command: args,
Reader: r,
Expand Down
2 changes: 2 additions & 0 deletions raft.go
Expand Up @@ -982,6 +982,7 @@ func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error {
// Restore the snapshot into the FSM. If this fails we are in a
// bad state so we panic to take ourselves out.
fsm := &restoreFuture{ID: sink.ID()}
fsm.ShutdownCh = r.shutdownCh
fsm.init()
select {
case r.fsmMutateCh <- fsm:
Expand Down Expand Up @@ -1576,6 +1577,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {

// Restore snapshot
future := &restoreFuture{ID: sink.ID()}
future.ShutdownCh = r.shutdownCh
future.init()
select {
case r.fsmMutateCh <- future:
Expand Down
4 changes: 3 additions & 1 deletion raft_test.go
Expand Up @@ -208,7 +208,9 @@ func TestRaft_RecoverCluster(t *testing.T) {
c.EnsureSamePeers(t)
}
for applies := 0; applies < 20; applies++ {
runRecover(applies)
t.Run(fmt.Sprintf("%d applies", applies), func(t *testing.T) {
runRecover(applies)
})
}
}

Expand Down
25 changes: 14 additions & 11 deletions testing.go
Expand Up @@ -2,6 +2,7 @@ package raft

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -276,19 +277,14 @@ func (c *cluster) Close() {
// or a timeout occurs. It is possible to set a filter to look for specific
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made.
func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan struct{} {
func (c *cluster) WaitEventChan(ctx context.Context, filter FilterFn) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
var timeoutCh <-chan time.Time
if timeout > 0 {
timeoutCh = time.After(timeout)
}
for {
select {
case <-timeoutCh:
case <-ctx.Done():
return

case o, ok := <-c.observationCh:
if !ok || filter == nil || filter(&o) {
return
Expand All @@ -304,11 +300,13 @@ func (c *cluster) WaitEventChan(filter FilterFn, timeout time.Duration) <-chan s
// observations. Setting timeout to 0 means that it will wait forever until a
// non-filtered observation is made or a test failure is signaled.
func (c *cluster) WaitEvent(filter FilterFn, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()

case <-c.WaitEventChan(filter, timeout):
case <-eventCh:
}
}

Expand All @@ -319,7 +317,9 @@ func (c *cluster) WaitForReplication(fsmLength int) {

CHECK:
for {
ch := c.WaitEventChan(nil, c.conf.CommitTimeout)
ctx, cancel := context.WithTimeout(context.Background(), c.conf.CommitTimeout)
defer cancel()
ch := c.WaitEventChan(ctx, nil)
select {
case <-c.failedCh:
c.t.FailNow()
Expand Down Expand Up @@ -415,14 +415,17 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := c.WaitEventChan(ctx, filter)
select {
case <-c.failedCh:
c.t.FailNow()

case <-limitCh:
c.FailNowf("timeout waiting for stable %s state", s)

case <-c.WaitEventChan(filter, 0):
case <-eventCh:
c.logger.Debug("resetting stability timeout")

case t, ok := <-timer.C:
Expand Down

0 comments on commit b47bafa

Please sign in to comment.