Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connectivity state transitions when dialing #1596

Merged
merged 7 commits into from
Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions balancer/roundrobin/roundrobin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func TestOneBackend(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
}
Expand All @@ -143,7 +143,7 @@ func TestBackendsRoundRobin(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

Expand All @@ -158,7 +158,7 @@ func TestBackendsRoundRobin(t *testing.T) {
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
Expand All @@ -173,7 +173,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}

for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
Expand Down Expand Up @@ -202,13 +202,13 @@ func TestAddressesRemoved(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

Expand Down Expand Up @@ -248,7 +248,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
defer wg.Done()
// This RPC blocks until cc is closed.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded {
t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
}
cancel()
Expand Down Expand Up @@ -278,15 +278,15 @@ func TestNewAddressWhileBlocking(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
}

Expand All @@ -298,7 +298,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false))
testc.EmptyCall(context.Background(), &testpb.Empty{})
}()
}
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestOneServerDown(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

Expand All @@ -342,7 +342,7 @@ func TestOneServerDown(t *testing.T) {
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
Expand All @@ -357,7 +357,7 @@ func TestOneServerDown(t *testing.T) {
}

for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
Expand All @@ -371,7 +371,7 @@ func TestOneServerDown(t *testing.T) {
// Loop until see server[backendCount-1] twice without seeing server[backendCount].
var targetSeen int
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
switch p.Addr.String() {
Expand All @@ -390,7 +390,7 @@ func TestOneServerDown(t *testing.T) {
t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestAllServersDown(t *testing.T) {
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}

Expand All @@ -435,7 +435,7 @@ func TestAllServersDown(t *testing.T) {
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
Expand All @@ -450,7 +450,7 @@ func TestAllServersDown(t *testing.T) {
}

for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
Expand Down
4 changes: 2 additions & 2 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac = ac
ac.acbw = acbw
if acState != connectivity.Idle {
ac.connect(false)
ac.connect()
}
}
}

func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect(false)
acbw.ac.connect()
}

func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
Expand Down
2 changes: 1 addition & 1 deletion call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestInvokeCancelClosedNonFailFast(t *testing.T) {
req := "hello"
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := Invoke(ctx, "/foo/bar", &req, &reply, cc, FailFast(false)); err == nil {
if err := Invoke(ctx, "/foo/bar", &req, &reply, cc); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is called TestInvokeCancelClosedNonFailFast - did you mean to make this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Reverted.

t.Fatalf("canceled invoke on closed connection should fail")
}
server.stop()
Expand Down
49 changes: 22 additions & 27 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean.
func (ac *addrConn) connect(block bool) error {
func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
Expand All @@ -718,32 +718,18 @@ func (ac *addrConn) connect(block bool) error {
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()

if block {
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return e.Origin()
}
return err
return
}
// Start to monitor the error status of transport.
go ac.transportMonitor()
} else {
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
}()
}
ac.transportMonitor()
}()
return nil
}

Expand Down Expand Up @@ -904,15 +890,17 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {

// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
//
// The connectivity state of ac will be set to TransientFailure if it's not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate? It seems that ac.state will be set to connectivity.Connecting unconditionally and then to connectivity. TransientFailure on every attempt; I can't tell which part of this is sensitive to "if it's not first time doing resetTransport."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state is set by transportMonitor to TransientFailure before this function is called.
I removed this comment and add a comment in transportMonitor.

// first time doing resetTransport.
//
// TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
Expand All @@ -936,8 +924,10 @@ func (ac *addrConn) resetTransport() error {
return errConnClosing
}
ac.printf("connecting")
ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.state != connectivity.Connecting {
ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
Expand Down Expand Up @@ -1031,6 +1021,11 @@ func (ac *addrConn) transportMonitor() {
ac.adjustParams(t.GetGoAwayReason())
default:
}
ac.mu.Lock()
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
Expand Down Expand Up @@ -1102,7 +1097,7 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
ac.mu.Unlock()
// Trigger idle ac to connect.
if idle {
ac.connect(false)
ac.connect()
}
return nil, false
}
Expand Down
4 changes: 2 additions & 2 deletions pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
}()
}
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) {
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
}()
}
time.Sleep(50 * time.Millisecond)
Expand Down