Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix for stale reads on server startup, take #2 #3154

Merged
merged 4 commits into from
Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ func (s *Server) establishLeadership() error {

s.startAutopilot()

s.setConsistentReadReady()

return nil
}

Expand All @@ -199,6 +201,8 @@ func (s *Server) revokeLeadership() error {
return err
}

s.resetConsistentReadReady()

s.stopAutopilot()

return nil
Expand Down
6 changes: 6 additions & 0 deletions agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,13 @@ func TestLeader_LeftLeader(t *testing.T) {
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)

Expand Down
27 changes: 26 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,5 +434,30 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
return future.Error()
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)

for time.Now().Before(deadline) {

select {
case <-time.After(jitter):
// Drop through and check before we loop again.

case <-s.shutdownCh:
return fmt.Errorf("shutdown waiting for leader")
}

if s.isReadyForConsistentReads() {
return nil
}
}

return structs.ErrNotReadyForConsistentReads
}
39 changes: 39 additions & 0 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,42 @@ func TestRPC_blockingQuery(t *testing.T) {
}
}
}

func TestReadyForConsistentReads(t *testing.T) {
dir, s := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 2 * time.Millisecond
})
defer os.RemoveAll(dir)
defer s.Shutdown()

testrpc.WaitForLeader(t, s.RPC, "dc1")

if !s.isReadyForConsistentReads() {
t.Fatal("Server should be ready for consistent reads")
}

s.resetConsistentReadReady()

if err := s.consistentRead(); err.Error() != "Not ready to serve consistent reads" {
t.Fatal("Server should NOT be ready for consistent reads")
}

setConsistentFunc := func() {
time.Sleep(2 * time.Millisecond)
s.setConsistentReadReady()
}

go setConsistentFunc()

//set some time to wait for the goroutine above to finish
waitUntil := time.Now().Add(time.Millisecond * 5)
err := s.consistentRead()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could move the check from line 182 here to make sure we get that error initially.

for time.Now().Before(waitUntil) && err != nil {
err = s.consistentRead()
}

if err != nil {
t.Fatal("Expected server to be ready for consistent reads ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to print the error in here.

}

}
19 changes: 19 additions & 0 deletions agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"reflect"
"strconv"
"sync"
"sync/atomic"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should get moved up after sync w/o the extra space (tools like goimports will keep these sorted for you).

"time"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -141,6 +142,9 @@ type Server struct {
// updated
reconcileCh chan serf.Member

// used to track when the server is ready to serve consistent reads, updated atomically
readyForConsistentReads int32

// router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas.
router *servers.Router
Expand Down Expand Up @@ -1002,6 +1006,21 @@ func (s *Server) GetWANCoordinate() (*coordinate.Coordinate, error) {
return s.serfWAN.GetCoordinate()
}

// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
}

// Atomically reset readiness state flag on leadership revoke
func (s *Server) resetConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 0)
}

// Returns true if this server is ready to serve consistent reads
func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this didn't end up as a counter I'd do == 1.

}

// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
Expand Down
7 changes: 4 additions & 3 deletions agent/consul/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

var (
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
)

type MessageType uint8
Expand Down
4 changes: 3 additions & 1 deletion website/source/docs/internals/consensus.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ is an opaque binary blob). The leader then writes the entry to durable storage a
attempts to replicate to a quorum of followers. Once the log entry is considered
*committed*, it can be *applied* to a finite state machine. The finite state machine
is application specific; in Consul's case, we use
[BoltDB](https://github.com/boltdb/bolt) to maintain cluster state.
[MemDB](https://github.com/hashicorp/go-memdb) to maintain cluster state. Consul's writes
block until it is both _committed_ and _applied_. This achieves read after write semantics
when used with the [consistent](/api/index.html#consistent) mode for queries.

Obviously, it would be undesirable to allow a replicated log to grow in an unbounded
fashion. Raft provides a mechanism by which the current state is snapshotted and the
Expand Down