Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: don't halt lease client if there is a lease error #7866

Closed
wants to merge 8 commits into from
7 changes: 2 additions & 5 deletions clientv3/concurrency/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,9 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
}

ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
return nil, err
}

keepAlive := client.KeepAlive(ctx, id)
donec := make(chan struct{})

s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

// keep the lease alive until client error or cancelled context
Expand Down
15 changes: 8 additions & 7 deletions clientv3/example_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ func ExampleLease_keepAlive() {
}

// the key 'foo' will be kept forever
ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
}
ch := cli.KeepAlive(context.TODO(), resp.ID)

ka := <-ch
if ka.Err != nil {
log.Fatal(ka.Err)
}

fmt.Println("ttl:", ka.TTL)
// Output: ttl: 5
}
Expand All @@ -131,9 +132,9 @@ func ExampleLease_keepAliveOnce() {
}

// to renew the lease only once
ka, kaerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if kaerr != nil {
log.Fatal(kaerr)
ka := cli.KeepAliveOnce(context.TODO(), resp.ID)
if ka.Err != nil {
log.Fatal(ka.Err)
}

fmt.Println("ttl:", ka.TTL)
Expand Down
76 changes: 23 additions & 53 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ func TestLeaseKeepAliveOnce(t *testing.T) {
t.Errorf("failed to create lease %v", err)
}

_, err = lapi.KeepAliveOnce(context.Background(), resp.ID)
if err != nil {
t.Errorf("failed to keepalive lease %v", err)
ka := lapi.KeepAliveOnce(context.Background(), resp.ID)
if ka.Err != nil {
t.Errorf("failed to keepalive lease %v", ka.Err)
}

_, err = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0))
if err != rpctypes.ErrLeaseNotFound {
t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, err)
ka = lapi.KeepAliveOnce(context.Background(), clientv3.LeaseID(0))
if ka.Err != rpctypes.ErrLeaseNotFound {
t.Errorf("expected %v, got %v", rpctypes.ErrLeaseNotFound, ka.Err)
}
}

Expand All @@ -129,10 +129,7 @@ func TestLeaseKeepAlive(t *testing.T) {
t.Errorf("failed to create lease %v", err)
}

rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
rc := lapi.KeepAlive(context.Background(), resp.ID)

kresp, ok := <-rc
if !ok {
Expand Down Expand Up @@ -163,11 +160,7 @@ func TestLeaseKeepAliveOneSecond(t *testing.T) {
if err != nil {
t.Errorf("failed to create lease %v", err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}

rc := cli.KeepAlive(context.Background(), resp.ID)
for i := 0; i < 3; i++ {
if _, ok := <-rc; !ok {
t.Errorf("chan is closed, want not closed")
Expand All @@ -193,10 +186,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
t.Errorf("failed to create lease %v", err)
}

rc, kerr := lapi.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Errorf("failed to keepalive lease %v", kerr)
}
rc := lapi.KeepAlive(context.Background(), resp.ID)

kresp := <-rc
if kresp.ID != resp.ID {
Expand Down Expand Up @@ -230,7 +220,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {

type leaseCh struct {
lid clientv3.LeaseID
ch <-chan *clientv3.LeaseKeepAliveResponse
ch clientv3.LeaseKeepAliveChan
}

// TestLeaseKeepAliveNotFound ensures a revoked lease won't stop other keep alives
Expand All @@ -247,10 +237,7 @@ func TestLeaseKeepAliveNotFound(t *testing.T) {
if rerr != nil {
t.Fatal(rerr)
}
kach, kaerr := cli.KeepAlive(context.Background(), resp.ID)
if kaerr != nil {
t.Fatal(kaerr)
}
kach := cli.KeepAlive(context.Background(), resp.ID)
lchs = append(lchs, leaseCh{resp.ID, kach})
}

Expand Down Expand Up @@ -375,10 +362,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
rc := cli.KeepAlive(context.Background(), resp.ID)
kresp := <-rc
if kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
Expand All @@ -397,9 +381,10 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {

// some keep-alives may still be buffered; drain until close
timer := time.After(time.Duration(kresp.TTL) * time.Second)
for kresp != nil {
loop := true
for loop {
select {
case kresp = <-rc:
case _, loop = <-rc:
case <-timer:
t.Fatalf("keepalive channel did not close")
}
Expand All @@ -423,10 +408,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) {
}
// keep client disconnected
clus.Members[0].Stop(t)
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
rc := cli.KeepAlive(context.Background(), resp.ID)
select {
case ka, ok := <-rc:
if ok {
Expand Down Expand Up @@ -454,10 +436,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rc, kerr := cli.KeepAlive(context.Background(), resp.ID)
if kerr != nil {
t.Fatal(kerr)
}
rc := cli.KeepAlive(context.Background(), resp.ID)
if kresp := <-rc; kresp.ID != resp.ID {
t.Fatalf("ID = %x, want %x", kresp.ID, resp.ID)
}
Expand Down Expand Up @@ -580,10 +559,7 @@ func TestLeaseRenewLostQuorum(t *testing.T) {

kctx, kcancel := context.WithCancel(context.Background())
defer kcancel()
ka, err := cli.KeepAlive(kctx, r.ID)
if err != nil {
t.Fatal(err)
}
ka := cli.KeepAlive(kctx, r.ID)
// consume first keepalive so next message sends when cluster is down
<-ka
lastKa := time.Now()
Expand Down Expand Up @@ -630,9 +606,9 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
}
cli.Close()

_, err = cli.KeepAlive(ctx, resp.ID)
if _, ok := err.(clientv3.ErrKeepAliveHalted); !ok {
t.Fatalf("expected %T, got %v(%T)", clientv3.ErrKeepAliveHalted{}, err, err)
ka := cli.KeepAlive(ctx, resp.ID)
if resp, ok := <-ka; ok {
t.Fatalf("expected closed channel, got response %+v", resp)
}
}

Expand Down Expand Up @@ -707,15 +683,9 @@ func TestLeaseWithRequireLeader(t *testing.T) {
t.Fatal(err2)
}
// kaReqLeader close if the leader is lost
kaReqLeader, kerr1 := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID)
if kerr1 != nil {
t.Fatal(kerr1)
}
kaReqLeader := c.KeepAlive(clientv3.WithRequireLeader(context.TODO()), lid1.ID)
// kaWait will wait even if the leader is lost
kaWait, kerr2 := c.KeepAlive(context.TODO(), lid2.ID)
if kerr2 != nil {
t.Fatal(kerr2)
}
kaWait := c.KeepAlive(context.TODO(), lid2.ID)

select {
case <-kaReqLeader:
Expand Down
Loading