-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: initial kv order caching #8092
Conversation
@@ -29,6 +29,7 @@ var ( | |||
ErrGRPCDuplicateKey = grpc.Errorf(codes.InvalidArgument, "etcdserver: duplicate key given in txn request") | |||
ErrGRPCCompacted = grpc.Errorf(codes.OutOfRange, "etcdserver: mvcc: required revision has been compacted") | |||
ErrGRPCFutureRev = grpc.Errorf(codes.OutOfRange, "etcdserver: mvcc: required revision is a future revision") | |||
ErrGRPCPastRev = grpc.Errorf(codes.FailedPrecondition, "etcdserver: required response revision to be higher than previous revisions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove these errors, this file is only meant for errors sent out by the etcd server over grpc (and nothing in this patch uses the errors anyway)
clientv3/ordering/kv.go
Outdated
|
||
type kvOrderingCache struct { | ||
clientv3.KV | ||
switchEndpoint SwitchEndpointClosure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The endpoint switching shouldn't be baked into the type name; it's a policy decision which can be left to the function. Try type OrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright that makes sense. Regardless of the how to switch endpoints and the retry rate is implemented in the OrderViolationFunc type, the Get
method should still keep looping through and calling the kv.orderViolation method, right? So I was thinking of using:
r, err := kv.KV.Do(ctx, clientv3.OpGet(key, opts...))
currRev := r.Get().Header.Revision
for err != nil && prevRev > currRev {
kv.orderViolation(op, resp, prevRev)
r, err = kv.KV.Do(ctx, clientv3.OpGet(key, opts...))
}
Does the above look right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idea is OK, but needs to fetch the prev rev and update it:
op := clientv3.OpGet(key, opts...)
lastRev := kv.prevRev() // safely get prev rev w/ atomic or rwmutex
for {
r, err := kv.KV.Do(ctx, op)
if err != nil {
return nil, err
}
resp := r.Get()
if resp.Header.Revision >= lastRev {
kv.update(r.Header.Revision) // safely update prev rev
return resp, nil
}
kv.orderViolation(op, r, lastRev)
}
clientv3/ordering/kv.go
Outdated
} | ||
|
||
func (kv *kvOrderingCache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { | ||
if len(key) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary; the namespace code does a check here to avoid prefixing an empty key and having the request succeed when it shouldn't
clientv3/ordering/kv.go
Outdated
if len(key) == 0 { | ||
return nil, rpctypes.ErrEmptyKey | ||
} | ||
r, err := kv.KV.Do(ctx, clientv3.OpGet(key, opts...)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get the previous revision before submitting the RPC; otherwise, this breaks with concurrent accesses. Example:req1 has prevRev=1, req2 has prevRev=1, req2 returns and sets prevRev=2, req1 returns and uses prevRev=2 when it should be comparing with prevRev=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
req1 has prevRev=1, req2 has prevRev=1, req2 returns and sets prevRev=2, req1 returns and uses prevRev=2 when it should be comparing with prevRev=1
In your example, is the same client is sending out req1 and req2 in some sort of asynchronous fashion (maybe through seperate goroutines)?
If so, maybe I can just store a local variable set to the kv.prevRev at the time of sending req1 and then checking the returned rev of res1 against that local variable, so that the comparision is not affected by the changes to the prevRev that req2 might cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the requests could come in from separate goroutines (e.g., proxy serves each RPC request in a separate go-routine but uses only one client). A local variable would handle it.
clientv3/ordering/kv.go
Outdated
res := r.Get() | ||
if kv.prevRev > res.Header.Revision { | ||
kv.switchEndpoint() | ||
kv.Get(ctx, key, opts...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this ignores the get response
clientv3/ordering/kv.go
Outdated
kv.switchEndpoint() | ||
kv.Get(ctx, key, opts...) | ||
} else { | ||
kv.prevRev = res.Header.Revision |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kv.prevRev either needs to be protected with a mutex or with atomics; otherwise there will be data races
clientv3/ordering/kv.go
Outdated
return nil, err | ||
} | ||
res := r.Get() | ||
if kv.prevRev > res.Header.Revision { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There needs to be some kind of looping here in case the reconnect goes to another outdated server. For example, a 5 node cluster can have two out-dated members, so switching from one to the other could still give an out of order result.
clientv3/ordering/kv.go
Outdated
_, ok := ((*resOp).Response).(*pb.ResponseOp_ResponseRange) | ||
if ok && txn.kv.prevRev > resp.Header.Revision { | ||
txn.kv.switchEndpoint() | ||
txn.Commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens to the server response?
clientv3/ordering/kv.go
Outdated
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" | ||
) | ||
|
||
type kvOrderingCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just kvOrdering
? this doesn't behave like a cache
clientv3/ordering/kv.go
Outdated
} | ||
|
||
func (kv *kvOrdering) setPrevRev(currRev int64) { | ||
atomic.StoreInt64(&kv.prevRev, currRev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably want to prevent this from going backwards if there are concurrent calls to setPrevRev
:
for {
pr := kv.getPrevRev()
if currentRev <= pr {
return
}
if atomic.CompareAndSwapInt64(&kv.prevRev, pr, currRev) {
return
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadn't considered that. Good point.
clientv3/ordering/util.go
Outdated
|
||
type KvOrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) | ||
|
||
type TxnOrderViolationFunc func(resp *clientv3.TxnResponse, prevRev int64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary necessary as of 9cb12de; txns can be wrapped as OpRequest/OpResponse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I can wrap the TxnResponse because OpResponse does not export any of it's fields. clientv3.OpResponse{txn: resp}
won't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build a OpTxn
on Commit
, then issue the RPC with Do
to get an OpResponse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I couldn't get the Txn's cmps, thenOps, and elseOps required to construct the OpTxn
through the Txn
interface, I created a wrapper implementation around the If
, Then
, and Else
to store the cmps, thenOps, and elseOps. Is this approach the cleanest or is there another more intuitive way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the wrapper approach is OK
clientv3/ordering/kv.go
Outdated
return nil, err | ||
} | ||
var ordered bool = true | ||
for _, resOp := range resp.Responses { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only need to check resp.Header.Revision; no need to iterate through the responses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for reference, how is the resp.Header.Revision determined? Are the individual responses involved in the process of determining the resp.Header.Revision?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header revision is the etcd store revision at the time of servicing the RPC. Since txns are atomic, there only needs to be one header revision (e.g., two puts in a txn will commit under the same revision)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few last nits; looks almost ready to merge. Also, please squash into a single patch titled ordering: add client ordering wrapper
so the commit history is tidy. Thanks!
clientv3/ordering/util.go
Outdated
|
||
type OrderViolationFunc func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) | ||
|
||
func switchEndpointClosure(c clientv3.Client) OrderViolationFunc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func NewOrderViolationSwitchEndpoint(c clientv3.Client)
so other packages can use this?
clientv3/ordering/kv_test.go
Outdated
for _, r := range res.Responses { | ||
_, ok := ((*r).Response).(*pb.ResponseOp_ResponseRange) | ||
if rev := res.Header.Revision; ok && rev < tt.prevRev { | ||
t.Errorf("Response revision: %d was less than the previous revision: %d\n", rev, tt.prevRev) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idiomatic go would be closer to
t.Errorf("#%d.%d: expected revision %d, got %d", i, j, tt.prevRev, rev)
clientv3/ordering/kv_test.go
Outdated
txn := &txnOrdering{mTxn, kv} | ||
res, err := txn.Commit() | ||
if err != nil { | ||
t.Errorf("For response: %+v, received the following error: %v\n", tt.response, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idiomatic go would be closer to
t.Errorf("#%d: expected response %+v, got error %+v", i, tt.response, err)
clientv3/ordering/kv_test.go
Outdated
} | ||
|
||
func TestTxnOrdering(t *testing.T) { | ||
for _, tt := range txnTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably use the index for the error messages:
for i, tt := range txnTests {
In order to do any testing, I would have to mock the |
@mangoslicer maybe have functions like |
@heyitsanthony Also, what happens if the old member, which has the lastest rev for a key, sets the prevRev and then goes down? Assuming that no other member in the cluster has a rev higher than the prevRev, would there be am infinite loop of calling |
So long as the cluster has quorum, at least one member will have the latest revision.
If the cluster doesn't have quorum, the request could loop forever. One way around this is to have OrderViolationFunction return an error so the user can provide a policy to terminate the looping. It might be good to have a test like the ones in clientv3/integration that checks this works against a cluster instead of only mocks. Easiest way to do it is create a cluster with 3 nodes. Create a client pointing to member 0. Put a key, get the key through clus.Client(1) so member 1 is known to have it, stop member 2, put the key again, get with the ordering client. Then, stop members 0 and 1, restart member 2. Member 2 will have an outdated key Switch the ordering client's endpoints to point to member 2. Try to get the key and confirm the ordering client detects the order violation with a test violation function. |
func (resp *TxnResponse) ToOpResponse() OpResponse { | ||
return OpResponse{txn: resp} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include PutResponse and DelResponse for completeness?
clientv3/ordering/kv.go
Outdated
txn.mu.Lock() | ||
txn.cmps = cs | ||
txn.mu.Unlock() | ||
return txn.Txn.If(cs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be
txn.mu.Lock()
defer txn.mu.Unlock()
txn.cmps = cs
txn.Txn = txn.Txn.If(cs...)
return txn
otherwise Commit
will use txn.Txn.Commit instead of txnOrdering.Commit
clientv3/ordering/kv.go
Outdated
txn.mu.Lock() | ||
txn.thenOps = ops | ||
txn.mu.Unlock() | ||
return txn.Txn.Then(ops...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
txn.mu.Lock()
defer txn.mu.Unlock()
txn.thenOps = ops
txn.Txn = txn.Txn.Then(ops...)
return txn
clientv3/ordering/kv.go
Outdated
txn.mu.Lock() | ||
txn.elseOps = ops | ||
txn.mu.Unlock() | ||
return txn.Txn.Else(ops...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
txn.mu.Lock()
defer txn.mu.Unlock()
txn.elseOps = ops
txn.Txn = txn.Txn.Else(ops...)
return txn
clientv3/ordering/kv.go
Outdated
// middle of the Commit operation. | ||
prevRev := txn.getPrevRev() | ||
for { | ||
opTxn := clientv3.OpTxn(txn.cmps, txn.thenOps, txn.elseOps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this before the for {}
since it never changes in the loop?
After I kill the 0th and 1st member, how is the 2nd member supposed to become the leader? It seems that after I try to kill the first two members, the third member keeps requesting votes but it doesn't receive votes from the stopped nodes, so the term keeps increasing without a leader. As a result, the Range request doesn't complete and the unit test times out. I tried using
Since the signature of the Get method can not be changed from |
@mangoslicer the point of the test isn't to get a new leader, it's to get a member that's behind and check that the ordering kv detects the outdated revision. The OrderViolationFunc supplied by the test would return an error and that would be returned by the client so there's no need to elect a new leader to make forward progress. |
I added unit tests for the ordering wrapper and for the endpoint-switching code. There are also two changes that I made to the endpoint-switching code. One of them is that the endpoint-switching code return an error if it switches through all of the endpoints and is not able to find a higher rev. I thought that this change would prevent the infinite loop that would occur if the the member that set the prevRev crashes before replicating the write log to members with lower revs. The other change I made was a one second time out added after setting the endpoint of the client to a specific member, because while unit-testing, I found that without some sort of delay, the |
@mangoslicer OK this is almost ready to merge, but CI is failing. Segfault on semaphore (https://semaphoreci.com/coreos/etcd/branches/pull-request-8092/builds/10).
Commit format checking is failing on travis. The fix would be squashing the patches into a single commit titled Thanks! |
6f0b938
to
887df72
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks
@mangoslicer Thanks! |
#7623