Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3/integration: test lease keepalive #4369

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package clientv3

import (
"errors"
"sync"
"time"

Expand All @@ -25,6 +26,10 @@ import (
"github.com/coreos/etcd/pkg/transport"
)

var (
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
)

// Client provides and manages an etcd v3 client session.
type Client struct {
// KV is the keyvalue API for the client's connection.
Expand Down Expand Up @@ -182,6 +187,10 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
// connection is established.
func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
var err error
if len(c.Endpoints()) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to put this in New instead of checking in the retrydialer every redial?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will do.

return nil, ErrNoAvailableEndpoints
}

for _, ep := range c.Endpoints() {
conn, curErr := c.Dial(ep)
if curErr != nil {
Expand Down
111 changes: 110 additions & 1 deletion clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"testing"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/clientv3"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestLeaseCreate(t *testing.T) {
func TestLeaseRevoke(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

lapi := clientv3.NewLease(clus.RandClient())
Expand All @@ -73,3 +74,111 @@ func TestLeaseRevoke(t *testing.T) {
t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound)
}
}

func TestLeaseKeepAliveOnce(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

lapi := clientv3.NewLease(clus.RandClient())
defer lapi.Close()

resp, err := lapi.Create(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}

_, err = lapi.KeepAliveOnce(context.Background(), lease.LeaseID(resp.ID))
if err != nil {
t.Errorf("failed to keepalive lease", err)
}
}

func TestLeaseKeepAlive(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

lapi := clientv3.NewLease(clus.RandClient())

resp, err := lapi.Create(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}

rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}

kresp, ok := <-rc
if !ok {
t.Errorf("chan is closed, want not closed")
}

if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test the channel closes on Close()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will do.


lapi.Close()

_, ok = <-rc
if ok {
t.Errorf("chan is not closed, want lease Close() closes chan")
}
}

// TODO: add a client that can connect to all the members of cluster via unix sock.
// TODO: test handle more complicated failures.
func TestLeaseKeepAliveHandleFailure(t *testing.T) {
t.Skip("test it when we have a cluster client")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

// TODO: change this line to get a cluster client
lapi := clientv3.NewLease(clus.RandClient())

resp, err := lapi.Create(context.Background(), 10)
if err != nil {
t.Errorf("failed to create lease %v", err)
}

rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID))
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}

kresp := <-rc
if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}

// restart the connected member.
clus.Members[0].Stop(t)

select {
case <-rc:
t.Fatalf("unexpected keepalive")
case <-time.After(10*time.Second/3 + 1):
}

// recover the member.
clus.Members[0].Restart(t)

kresp = <-rc
if kresp.ID != resp.ID {
t.Errorf("ID = %x, want %x", kresp.ID, resp.ID)
}

lapi.Close()

_, ok := <-rc
if ok {
t.Errorf("chan is not closed, want lease Close() closes chan")
}
}
19 changes: 19 additions & 0 deletions clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (l *lessor) Close() error {

l.stopCancel()
l.stream = nil

return nil
}

Expand Down Expand Up @@ -213,11 +214,25 @@ func (l *lessor) recvKeepAliveLoop() {
return
}

defer func() {
l.mu.Lock()
defer l.mu.Unlock()

for _, ch := range l.keepAlives {
close(ch)
}
}()

for {
stream := l.getKeepAliveStream()

resp, err := stream.Recv()
if err != nil {
if isRPCError(err) {
l.Close()
return
}

err = l.switchRemoteAndStream(err)
if err != nil {
l.Close()
Expand Down Expand Up @@ -280,6 +295,10 @@ func (l *lessor) sendKeepAliveLoop() {
r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
err = stream.Send(r)
if err != nil {
if isRPCError(err) {
l.Close()
return
}
break
}
}
Expand Down