-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc lock service #7444
grpc lock service #7444
Conversation
body: "*" | ||
}; | ||
} | ||
rpc Unlock(UnlockRequest) returns (UnlockResponse) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a verify or check interface? So user can verify if itself is still the locker owner or the other component is still the owner of the lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key will only be returned on lock acquire. So long as Key
exists with create rev <= response.header.revision (for most cases you'll only need the exists check), it holds the lock. That way it can interact with Txns safely.
I think it only needs some documentation, not a full RPC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets say we want to a lock with name mylock
. the returned the key will be mylock/001
. Then the client can check it mylock/abc
still exists to see if the lock is still held by the owner? we have the guarantee that lock key is unique?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's keyed by lease so it will be unique for distinct leases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. this should work. i agree it is just a doc issue. client can build utility func to provide a better interface to end users though.
if err != nil { | ||
return nil, err | ||
} | ||
s.Orphan() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to call Orphan here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise the server will keep the lease alive indefinitely.
An alternative to putting it here is defer s.Orphan()
so it'll keep the lease alive until the the lock is acquired so the client doesn't have to worry about keep alives. The problem with that is there's no guarantee how much time the lease will have left after the acquire. This could be solved by issuing a keepalive before returning from the RPC.
Another option is to make leases optional and have an additional TTL field so the server can grant a lease and return a lock held for at least the given TTL.
etcdserver/api/v3client/v3client.go
Outdated
) | ||
|
||
func New(s *etcdserver.EtcdServer) *clientv3.Client { | ||
c := clientv3.NewCtxClient(context.TODO()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use context.Background instead?
clientv3/client.go
Outdated
@@ -77,6 +77,11 @@ func New(cfg Config) (*Client, error) { | |||
return newClient(&cfg) | |||
} | |||
|
|||
func NewCtxClient(ctx context.Context) *Client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment on this Client? Probably document that this client is defined for internal usage?
option (gogoproto.marshaler_all) = true; | ||
option (gogoproto.unmarshaler_all) = true; | ||
|
||
service Lock { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to document this proto file (service + messages)
LGTM in general. @fanminshi please take a look. |
553d8c2
to
62d653d
Compare
Had to slip in a slight grpcproxy restructure to break a cyclic dependency introduced by the cluster autosyncing patch; the client<->server grpc adapters now live in proxy/grpcproxy/adapter. |
clientv3/concurrency/mutex.go
Outdated
} | ||
|
||
func NewMutex(s *Session, pfx string) *Mutex { | ||
return &Mutex{s, pfx + "/", "", -1} | ||
return &Mutex{s, pfx + "/", "", -1, nil} | ||
} | ||
|
||
// Lock locks the mutex with a cancellable context. If the context is cancelled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancellable
=> cancelable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't part of the patch, but OK...
clientv3/concurrency/key.go
Outdated
"github.com/coreos/etcd/mvcc/mvccpb" | ||
"golang.org/x/net/context" | ||
) | ||
|
||
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { | ||
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) (*pb.ResponseHeader, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(*pb.ResponseHeader
is not used by waitDeletes()
, and it seems like it's not used by anything else. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this can be reverted
} | ||
|
||
// waitDeletes efficiently waits until all keys matching the prefix and no greater | ||
// than the create revision. | ||
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error { | ||
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) { | ||
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev)) | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what's the rationale of for loop here? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It loops because waitDelete
will return after sensing one delete. There is no guarantee that all keys within the revision range will be deleted by that time. This code is a little old and the API is a bit stronger now so this can possibly be optimized to be single-shot on the watcher, but I don't want to change it for this patch.
clientv3/concurrency/key.go
Outdated
} | ||
lastKey := string(resp.Kvs[0].Key) | ||
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil { | ||
return err | ||
if _, werr := waitDelete(ctx, client, lastKey, resp.Header.Revision); werr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the resp.Header.Revision
here corresponding revision when lastKey is created? or the revision when Get()
request is finished?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the revision the Get
sees. The create revision for lastKey
is already in Kvs[0]
.
62d653d
to
eaefc33
Compare
Codecov Report
@@ Coverage Diff @@
## master #7444 +/- ##
=========================================
Coverage ? 71.12%
=========================================
Files ? 241
Lines ? 21347
Branches ? 0
=========================================
Hits ? 15184
Misses ? 5033
Partials ? 1130
Continue to review full report at Codecov.
|
message LockResponse { | ||
etcdserverpb.ResponseHeader header = 1; | ||
// key is a key that will exist on etcd for the duration | ||
// that the Lock caller owns the lock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a warning that: users should not modify this key with kv api, or lock might have undefined behavior?
LGTM |
lgtm. Thanks! |
Break cyclic dependency: clientv3/naming <-> integration <-> v3client <-> grpcproxy <-> clientv3/naming
For creating client from etcdserver.
eaefc33
to
300323f
Compare
Fixes #6285