-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug fix for stale reads on server startup, take #2 #3154
Conversation
…eout instead of maxQueryTime, plus tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass through and noted a few small things. This looks like it's getting pretty close!
agent/consul/rpc.go
Outdated
@@ -434,5 +434,31 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { | |||
func (s *Server) consistentRead() error { | |||
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now()) | |||
future := s.raft.VerifyLeader() | |||
return future.Error() | |||
err := future.Error() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is just scoped here I'd do:
if err := future.Error(); err != nil {
agent/consul/rpc.go
Outdated
return nil | ||
} | ||
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) | ||
deadline := time.Now().Add(jitter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this'll just wait one jitterFraction
of the overall RPCHoldTimeout
. If you set deadline to time.Now().Add(s.config.RPCHoldTimeout)
that should work. The jitter basically is the poll interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other jitter
loop calculates a new jitter
each time through, which randomizes the polling a little more, but I don't think that really matters. This won't be phase-aligned with anything else.
agent/consul/rpc.go
Outdated
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) | ||
deadline := time.Now().Add(jitter) | ||
|
||
for time.Now().After(deadline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be Before(deadline)
- I don't think this'll run through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I was writing this for loop four different ways to play with it, and in one version now.after was a condition to break out of the loop, lame copy paste bug,
Working on a unit test around this block now
agent/consul/server.go
Outdated
@@ -141,6 +143,9 @@ type Server struct { | |||
// updated | |||
reconcileCh chan serf.Member | |||
|
|||
// used to track when the server is ready to serve consistent reads | |||
readyForConsistentReads uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use int32
for this since there's no real reason to make it unsigned (as close as atomic gets to a plain old int
). I'd also put a comment here that atomic instructions are used to access this.
agent/consul/server.go
Outdated
@@ -16,6 +16,8 @@ import ( | |||
"sync" | |||
"time" | |||
|
|||
"sync/atomic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should get moved up after sync
w/o the extra space (tools like goimports
will keep these sorted for you).
agent/consul/leader_test.go
Outdated
@@ -493,6 +497,9 @@ func TestLeader_LeftLeader(t *testing.T) { | |||
t.Fatalf("Should have a leader") | |||
} | |||
leader.Leave() | |||
if leader.isReadyForConsistentReads() { | |||
t.Fatalf("Expectected consistent read state to be false ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in "Expectected"
agent/consul/leader_test.go
Outdated
@@ -32,6 +32,10 @@ func TestLeader_RegisterMember(t *testing.T) { | |||
|
|||
testrpc.WaitForLeader(t, s1.RPC, "dc1") | |||
|
|||
if !s1.isReadyForConsistentReads() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little weird up in this test, I'd move this to TestLeader_LeftLeader
where you check that this gets set to false already and make sure it gets set to true when the leader is the leader.
agent/consul/leader.go
Outdated
@@ -183,6 +183,9 @@ func (s *Server) establishLeadership() error { | |||
|
|||
s.startAutopilot() | |||
|
|||
// Set consistent read readiness state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably drop the comment here and on line 187 - the function names are clear enough.
agent/consul/rpc.go
Outdated
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / jitterFraction) | ||
deadline := time.Now().Add(jitter) | ||
|
||
for time.Now().After(deadline) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could cook up a unit test to exercise this where you wait for a leader and make sure that it reports ready for consistent reads and then call the function to reset this from the unit test. You could call this and make sure you get the error you expect saying it's not ready for consistent reads. You could then arrange for a goroutine to sleep for a bit and then set it back to 1, and have the test call this and make sure it eventually unblocks and returns no error.
Addressed all of the above, unit test is done as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - just a couple tiny nit things
agent/consul/rpc_test.go
Outdated
} | ||
|
||
if err != nil { | ||
t.Fatal("Expected server to be ready for consistent reads ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to print the error in here.
agent/consul/server.go
Outdated
|
||
// Returns true if this server is ready to serve consistent reads | ||
func (s *Server) isReadyForConsistentReads() bool { | ||
return atomic.LoadInt32(&s.readyForConsistentReads) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this didn't end up as a counter I'd do == 1
.
|
||
//set some time to wait for the goroutine above to finish | ||
waitUntil := time.Now().Add(time.Millisecond * 5) | ||
err := s.consistentRead() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could move the check from line 182 here to make sure we get that error initially.
Uses an atomic integer to track when leader is ready to serve consistent reads, and uses RPCHoldTime to wait for ready state.