From 1d13cb442a1d5447fd09a655be2846f679048d5a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 13:48:54 -0800 Subject: [PATCH 1/4] clientv3/integration: test lease keepalive --- clientv3/integration/lease_test.go | 95 +++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index c44fafcbd13..96e7d632641 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -16,6 +16,7 @@ package integration import ( "testing" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/clientv3" @@ -50,7 +51,7 @@ func TestLeaseCreate(t *testing.T) { func TestLeaseRevoke(t *testing.T) { defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) lapi := clientv3.NewLease(clus.RandClient()) @@ -73,3 +74,95 @@ func TestLeaseRevoke(t *testing.T) { t.Fatalf("err = %v, want %v", err, v3rpc.ErrLeaseNotFound) } } + +func TestLeaseKeepAliveOnce(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + _, err = lapi.KeepAliveOnce(context.Background(), lease.LeaseID(resp.ID)) + if err != nil { + t.Errorf("failed to keepalive lease", err) + } +} + +func TestLeaseKeepAlive(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID)) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } + + kresp := <-rc + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } +} + +// TODO: add a client that can connect to all the members of cluster via unix sock. +// TODO: test handle more complicated failures. +func TestLeaseKeepAliveHandleFailure(t *testing.T) { + t.Skip("test it when we have a cluster client") + + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // TODO: change this line to get a cluster client + lapi := clientv3.NewLease(clus.RandClient()) + defer lapi.Close() + + resp, err := lapi.Create(context.Background(), 10) + if err != nil { + t.Errorf("failed to create lease %v", err) + } + + rc, kerr := lapi.KeepAlive(context.Background(), lease.LeaseID(resp.ID)) + if kerr != nil { + t.Errorf("failed to keepalive lease %v", kerr) + } + + kresp := <-rc + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } + + // restart the connected member. + clus.Members[0].Stop(t) + + select { + case <-rc: + t.Fatalf("unexpected keepalive") + case <-time.After(10*time.Second/3 + 1): + } + + // recover the member. + clus.Members[0].Restart(t) + + kresp = <-rc + if kresp.ID != resp.ID { + t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) + } +} From 5553591b76ecf1998ade41c17bb5ff5d25778d16 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 16:30:10 -0800 Subject: [PATCH 2/4] clientv3: return ErrNoAvailableEndpoints --- clientv3/client.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/clientv3/client.go b/clientv3/client.go index 3139a047efa..5dd35720a78 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -15,6 +15,7 @@ package clientv3 import ( + "errors" "sync" "time" @@ -25,6 +26,10 @@ import ( "github.com/coreos/etcd/pkg/transport" ) +var ( + ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") +) + // Client provides and manages an etcd v3 client session. type Client struct { // KV is the keyvalue API for the client's connection. @@ -182,6 +187,10 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli // connection is established. func dialEndpointList(c *Client) (*grpc.ClientConn, error) { var err error + if len(c.Endpoints()) == 0 { + return nil, ErrNoAvailableEndpoints + } + for _, ep := range c.Endpoints() { conn, curErr := c.Dial(ep) if curErr != nil { From c1bea4ea6e12f600deb0fd35ececbf5a7ff320a8 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 16:30:57 -0800 Subject: [PATCH 3/4] clientv3: return on RPC(context deadline or cancelled) error --- clientv3/lease.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/clientv3/lease.go b/clientv3/lease.go index 2be2abc12b4..6c45bbb0a8c 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -186,6 +186,7 @@ func (l *lessor) Close() error { l.stopCancel() l.stream = nil + return nil } @@ -213,11 +214,25 @@ func (l *lessor) recvKeepAliveLoop() { return } + defer func() { + l.mu.Lock() + defer l.mu.Unlock() + + for _, ch := range l.keepAlives { + close(ch) + } + }() + for { stream := l.getKeepAliveStream() resp, err := stream.Recv() if err != nil { + if isRPCError(err) { + l.Close() + return + } + err = l.switchRemoteAndStream(err) if err != nil { l.Close() @@ -280,6 +295,10 @@ func (l *lessor) sendKeepAliveLoop() { r := &pb.LeaseKeepAliveRequest{ID: int64(id)} err = stream.Send(r) if err != nil { + if isRPCError(err) { + l.Close() + return + } break } } From 9f50cd24a75d4c3af3ea0d9a4c5146bf25136bb7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Feb 2016 16:31:21 -0800 Subject: [PATCH 4/4] clientv3/integration: test on lease close --- clientv3/integration/lease_test.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 96e7d632641..ee6ab1c9ba5 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -102,7 +102,6 @@ func TestLeaseKeepAlive(t *testing.T) { defer clus.Terminate(t) lapi := clientv3.NewLease(clus.RandClient()) - defer lapi.Close() resp, err := lapi.Create(context.Background(), 10) if err != nil { @@ -114,10 +113,21 @@ func TestLeaseKeepAlive(t *testing.T) { t.Errorf("failed to keepalive lease %v", kerr) } - kresp := <-rc + kresp, ok := <-rc + if !ok { + t.Errorf("chan is closed, want not closed") + } + if kresp.ID != resp.ID { t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) } + + lapi.Close() + + _, ok = <-rc + if ok { + t.Errorf("chan is not closed, want lease Close() closes chan") + } } // TODO: add a client that can connect to all the members of cluster via unix sock. @@ -132,7 +142,6 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { // TODO: change this line to get a cluster client lapi := clientv3.NewLease(clus.RandClient()) - defer lapi.Close() resp, err := lapi.Create(context.Background(), 10) if err != nil { @@ -165,4 +174,11 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { if kresp.ID != resp.ID { t.Errorf("ID = %x, want %x", kresp.ID, resp.ID) } + + lapi.Close() + + _, ok := <-rc + if ok { + t.Errorf("chan is not closed, want lease Close() closes chan") + } }