From 441e764e2e9ee0d1bd9ced5aed73191253abe2dc Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 2 Jul 2021 13:46:22 -0700 Subject: [PATCH 1/5] clientconn: do not automatically reconnect addrConns; go idle instead --- balancer/balancer.go | 17 +- clientconn.go | 266 ++++++++++++++-------------- clientconn_state_transition_test.go | 9 +- clientconn_test.go | 28 ++- pickfirst.go | 16 +- test/channelz_test.go | 18 +- test/creds_test.go | 1 + test/end2end_test.go | 15 +- 8 files changed, 219 insertions(+), 151 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index ab531f4c0b8..6a1b779edc2 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -353,8 +353,9 @@ var ErrBadResolverState = errors.New("bad resolver state") // // It's not thread safe. type ConnectivityStateEvaluator struct { - numReady uint64 // Number of addrConns in ready state. - numConnecting uint64 // Number of addrConns in connecting state. + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transient failure state. } // RecordTransition records state change happening in subConn and based on that @@ -362,9 +363,10 @@ type ConnectivityStateEvaluator struct { // // - If at least one SubConn in Ready, the aggregated state is Ready; // - Else if at least one SubConn in Connecting, the aggregated state is Connecting; -// - Else the aggregated state is TransientFailure. +// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure; +// - Else the aggregated state is Idle // -// Idle and Shutdown are not considered. +// Shutdown is not considered. func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State { // Update counters. for idx, state := range []connectivity.State{oldState, newState} { @@ -374,6 +376,8 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne cse.numReady += updateVal case connectivity.Connecting: cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal } } @@ -384,5 +388,8 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne if cse.numConnecting > 0 { return connectivity.Connecting } - return connectivity.TransientFailure + if cse.numTransientFailure > 0 { + return connectivity.TransientFailure + } + return connectivity.Idle } diff --git a/clientconn.go b/clientconn.go index b2bccfed136..7dd4731b20d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -322,6 +322,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * // A blocking dial blocks until the clientConn is ready. if cc.dopts.block { for { + cc.Connect() s := cc.GetState() if s == connectivity.Ready { break @@ -539,12 +540,24 @@ func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connec // // Experimental // -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. +// Notice: This API is EXPERIMENTAL and may be changed or removed in a later +// release. func (cc *ClientConn) GetState() connectivity.State { return cc.csMgr.getState() } +// Connect causes all subchannels in the ClientConn to attempt to connect if +// the channel is idle. +func (cc *ClientConn) Connect() { + if cc.GetState() == connectivity.Idle { + cc.mu.Lock() + for ac := range cc.conns { + go ac.connect() + } + cc.mu.Unlock() + } +} + func (cc *ClientConn) scWatcher() { for { select { @@ -883,6 +896,10 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { // ac.state is Ready, try to find the connected address. var curAddrFound bool for _, a := range addrs { + // a.ServerName takes precedent over ClientConn authority, if present. + if a.ServerName == "" { + a.ServerName = ac.cc.authority + } if reflect.DeepEqual(ac.curAddr, a) { curAddrFound = true break @@ -1135,112 +1152,86 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) { } func (ac *addrConn) resetTransport() { - for i := 0; ; i++ { - if i > 0 { - ac.cc.resolveNow(resolver.ResolveNowOptions{}) - } + ac.mu.Lock() + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return + } + + addrs := ac.addrs + backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) + // This will be the duration that dial gets to finish. + dialDuration := minConnectTimeout + if ac.dopts.minConnectTimeout != nil { + dialDuration = ac.dopts.minConnectTimeout() + } + if dialDuration < backoffFor { + // Give dial more time as we keep failing to connect. + dialDuration = backoffFor + } + // We can potentially spend all the time trying the first address, and + // if the server accepts the connection and then hangs, the following + // addresses will never be tried. + // + // The spec doesn't mention what should be done for multiple addresses. + // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm + connectDeadline := time.Now().Add(dialDuration) + + ac.updateConnectivityState(connectivity.Connecting, nil) + ac.mu.Unlock() + + if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { + // After exhausting all addresses, the addrConn enters + // TRANSIENT_FAILURE. ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return } + ac.updateConnectivityState(connectivity.TransientFailure, err) - addrs := ac.addrs - backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) - // This will be the duration that dial gets to finish. - dialDuration := minConnectTimeout - if ac.dopts.minConnectTimeout != nil { - dialDuration = ac.dopts.minConnectTimeout() - } - - if dialDuration < backoffFor { - // Give dial more time as we keep failing to connect. - dialDuration = backoffFor - } - // We can potentially spend all the time trying the first address, and - // if the server accepts the connection and then hangs, the following - // addresses will never be tried. - // - // The spec doesn't mention what should be done for multiple addresses. - // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm - connectDeadline := time.Now().Add(dialDuration) - - ac.updateConnectivityState(connectivity.Connecting, nil) - ac.transport = nil + // Backoff. + b := ac.resetBackoff ac.mu.Unlock() - newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) - if err != nil { - // After exhausting all addresses, the addrConn enters - // TRANSIENT_FAILURE. + timer := time.NewTimer(backoffFor) + select { + case <-timer.C: ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - ac.updateConnectivityState(connectivity.TransientFailure, err) - - // Backoff. - b := ac.resetBackoff + ac.backoffIdx++ ac.mu.Unlock() - - timer := time.NewTimer(backoffFor) - select { - case <-timer.C: - ac.mu.Lock() - ac.backoffIdx++ - ac.mu.Unlock() - case <-b: - timer.Stop() - case <-ac.ctx.Done(): - timer.Stop() - return - } - continue + case <-b: + timer.Stop() + case <-ac.ctx.Done(): + timer.Stop() + return } + ac.cc.resolveNow(resolver.ResolveNowOptions{}) ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN")) - return + if ac.state != connectivity.Shutdown { + ac.updateConnectivityState(connectivity.Idle, err) } - ac.curAddr = addr - ac.transport = newTr - ac.backoffIdx = 0 - - hctx, hcancel := context.WithCancel(ac.ctx) - ac.startHealthCheck(hctx) ac.mu.Unlock() - - // Block until the created transport is down. And when this happens, - // we restart from the top of the addr list. - <-reconnect.Done() - hcancel() - // restart connecting - the top of the loop will set state to - // CONNECTING. This is against the current connectivity semantics doc, - // however it allows for graceful behavior for RPCs not yet dispatched - // - unfortunate timing would otherwise lead to the RPC failing even - // though the TRANSIENT_FAILURE state (called for by the doc) would be - // instantaneous. - // - // Ideally we should transition to Idle here and block until there is - // RPC activity that leads to the balancer requesting a reconnect of - // the associated SubConn. + return } + // Success; reset backoff. + ac.mu.Lock() + ac.backoffIdx = 0 + ac.mu.Unlock() } // tryAllAddrs tries to creates a connection to the addresses, and stop when at the // first successful one. It returns the transport, the address and a Event in // the successful case. The Event fires when the returned transport disconnects. -func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { +func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { var firstConnErr error for _, addr := range addrs { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() - return nil, resolver.Address{}, nil, errConnClosing + return errConnClosing } ac.cc.mu.RLock() @@ -1255,9 +1246,9 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) - newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) + err := ac.createTransport(addr, copts, connectDeadline) if err == nil { - return newTr, addr, reconnect, nil + return nil } if firstConnErr == nil { firstConnErr = err @@ -1266,57 +1257,46 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T } // Couldn't connect to any address. - return nil, resolver.Address{}, nil, firstConnErr + return firstConnErr } -// createTransport creates a connection to addr. It returns the transport and a -// Event in the successful case. The Event fires when the returned transport -// disconnects. -func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { - prefaceReceived := make(chan struct{}) - onCloseCalled := make(chan struct{}) - reconnect := grpcsync.NewEvent() +// createTransport creates a connection to addr. It returns the transport or an +// error. +func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { + prefaceReceived := grpcsync.NewEvent() + connClosed := grpcsync.NewEvent() // addr.ServerName takes precedent over ClientConn authority, if present. if addr.ServerName == "" { addr.ServerName = ac.cc.authority } - once := sync.Once{} - onGoAway := func(r transport.GoAwayReason) { - ac.mu.Lock() - ac.adjustParams(r) - once.Do(func() { - if ac.state == connectivity.Ready { - // Prevent this SubConn from being used for new RPCs by setting its - // state to Connecting. - // - // TODO: this should be Idle when grpc-go properly supports it. - ac.updateConnectivityState(connectivity.Connecting, nil) - } - }) - ac.mu.Unlock() - reconnect.Fire() - } + // Because onPreface may be called before NewClientTransport returns, we + // pass the transport to onPreface via a channel. + hctx, hcancel := context.WithCancel(ac.ctx) + hcStarted := false // protected by ac.mu onClose := func() { ac.mu.Lock() - once.Do(func() { - if ac.state == connectivity.Ready { - // Prevent this SubConn from being used for new RPCs by setting its - // state to Connecting. - // - // TODO: this should be Idle when grpc-go properly supports it. - ac.updateConnectivityState(connectivity.Connecting, nil) - } - }) - ac.mu.Unlock() - close(onCloseCalled) - reconnect.Fire() + defer ac.mu.Unlock() + defer connClosed.Fire() + if !hcStarted { + // We didn't start the health check or set the state to READY, so + // no need to do anything else here. + return + } + hcancel() + ac.transport = nil + if ac.state != connectivity.Shutdown { + ac.updateConnectivityState(connectivity.Idle, nil) + } } - onPrefaceReceipt := func() { - close(prefaceReceived) + onGoAway := func(r transport.GoAwayReason) { + ac.mu.Lock() + ac.adjustParams(r) + ac.mu.Unlock() + onClose() } connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) @@ -1325,27 +1305,47 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne copts.ChannelzParentID = ac.channelzID } - newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose) + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose) if err != nil { // newTr is either nil, or closed. - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err) - return nil, nil, err + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err) + return err } select { case <-time.After(time.Until(connectDeadline)): // We didn't get the preface in time. - newTr.Close(fmt.Errorf("failed to receive server preface within timeout")) - channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) - return nil, nil, errors.New("timed out waiting for server handshake") - case <-prefaceReceived: + err := fmt.Errorf("failed to receive server preface within timeout") + newTr.Close(err) + channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err) + return err + case <-prefaceReceived.Done(): // We got the preface - huzzah! things are good. - case <-onCloseCalled: - // The transport has already closed - noop. - return nil, nil, errors.New("connection closed") - // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. + ac.mu.Lock() + defer ac.mu.Unlock() + defer prefaceReceived.Fire() + if connClosed.HasFired() { + // onClose called first; go idle but do nothing else. + if ac.state != connectivity.Shutdown { + ac.updateConnectivityState(connectivity.Idle, nil) + } + return nil + } + ac.curAddr = addr + ac.transport = newTr + hcStarted = true + ac.startHealthCheck(hctx) // Will set state to READY if appropriate. + return nil + case <-connClosed.Done(): + // The transport has already closed. If we received the preface, too, + // this is not an error. + select { + case <-prefaceReceived.Done(): + return nil + default: + return errors.New("connection closed before server preface received") + } } - return newTr, reconnect, nil } // startHealthCheck starts the health checking stream (RPC) to watch the health diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index cd1213fb4fd..2090c8de689 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -75,7 +75,7 @@ func (s) TestStateTransitions_SingleAddress(t *testing.T) { }, }, { - desc: "When the connection is closed, the client enters TRANSIENT FAILURE.", + desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.", want: []connectivity.State{ connectivity.Connecting, connectivity.TransientFailure, @@ -167,6 +167,7 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s t.Fatal(err) } defer client.Close() + go stayConnected(client) stateNotifications := testBalancerBuilder.nextStateNotifier() @@ -193,11 +194,12 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s } } -// When a READY connection is closed, the client enters CONNECTING. +// When a READY connection is closed, the client enters IDLE then CONNECTING. func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { want := []connectivity.State{ connectivity.Connecting, connectivity.Ready, + connectivity.Idle, connectivity.Connecting, } @@ -240,6 +242,7 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { t.Fatal(err) } defer client.Close() + go stayConnected(client) stateNotifications := testBalancerBuilder.nextStateNotifier() @@ -359,6 +362,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { want := []connectivity.State{ connectivity.Connecting, connectivity.Ready, + connectivity.Idle, connectivity.Connecting, } @@ -415,6 +419,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { t.Fatal(err) } defer client.Close() + go stayConnected(client) stateNotifications := testBalancerBuilder.nextStateNotifier() diff --git a/clientconn_test.go b/clientconn_test.go index a50db9419c2..da765615be1 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -217,7 +217,7 @@ func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) { client.Close() t.Fatalf("Unexpected success (err=nil) while dialing") } - expectedMsg := "server handshake" + expectedMsg := "server preface" if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) || !strings.Contains(err.Error(), expectedMsg) { t.Fatalf("DialContext(_) = %v; want a message that includes both %q and %q", err, context.DeadlineExceeded.Error(), expectedMsg) } @@ -289,6 +289,9 @@ func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } + + go stayConnected(client) + // wait for connection to be accepted on the server. timer := time.NewTimer(time.Second * 10) select { @@ -311,9 +314,7 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { defer lis.Close() done := make(chan struct{}) go func() { // Launch the server. - defer func() { - close(done) - }() + defer close(done) conn, err := lis.Accept() // Accept the connection only to close it immediately. if err != nil { t.Errorf("Error while accepting. Err: %v", err) @@ -340,13 +341,13 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { prevAt = meow } }() - client, err := Dial(lis.Addr().String(), WithInsecure()) + cc, err := Dial(lis.Addr().String(), WithInsecure()) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } - defer client.Close() + defer cc.Close() + go stayConnected(cc) <-done - } func (s) TestWithTimeout(t *testing.T) { @@ -831,6 +832,7 @@ func (s) TestResetConnectBackoff(t *testing.T) { t.Fatalf("Dial() = _, %v; want _, nil", err) } defer cc.Close() + go stayConnected(cc) select { case <-dials: case <-time.NewTimer(10 * time.Second).C: @@ -985,6 +987,7 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { t.Fatal(err) } defer client.Close() + go stayConnected(client) timeout := time.After(5 * time.Second) @@ -1112,3 +1115,14 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T t.Fatal("default service config failed to be applied after 1s") } } + +// stayConnected makes cc stay connected by repeatedly calling cc.Connect() +// until the state becomes Shutdown or until 10 seconds elapses. +func stayConnected(cc *ClientConn) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for state := cc.GetState(); state != connectivity.Shutdown && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + cc.Connect() + } +} diff --git a/pickfirst.go b/pickfirst.go index b858c2a5e63..f188f5c1532 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -107,10 +107,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S } switch s.ConnectivityState { - case connectivity.Ready, connectivity.Idle: + case connectivity.Ready: b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}}) case connectivity.Connecting: b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) + case connectivity.Idle: + b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &pickerKicker{sc: sc, err: balancer.ErrNoSubConnAvailable}}) case connectivity.TransientFailure: b.cc.UpdateState(balancer.State{ ConnectivityState: s.ConnectivityState, @@ -131,6 +133,18 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return p.result, p.err } +// pickerKicker kicks the SubConn into connecting when Pick is called. +type pickerKicker struct { + sc balancer.SubConn + result balancer.PickResult + err error +} + +func (pk *pickerKicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + pk.sc.Connect() + return pk.result, pk.err +} + func init() { balancer.Register(newPickfirstBuilder()) } diff --git a/test/channelz_test.go b/test/channelz_test.go index 47e7eb92716..6cb09dd8d89 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1689,8 +1689,22 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) { } te.srvs[0].Stop() te.srvs[1].Stop() - // Here, we just wait for all sockets to be up. In the future, if we implement - // IDLE, we may need to make several rpc calls to create the sockets. + // Here, we just wait for all sockets to be up. Make several rpc calls to + // create the sockets since we do not automatically reconnect. + done := make(chan struct{}) + defer close(done) + go func() { + for { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + tc.EmptyCall(ctx, &testpb.Empty{}) + cancel() + select { + case <-time.After(10 * time.Millisecond): + case <-done: + return + } + } + }() if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0, 0) if len(tcs) != 1 { diff --git a/test/creds_test.go b/test/creds_test.go index 6b3fc2a4607..0c601864181 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -165,6 +165,7 @@ func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "empty"}) te.userAgent = testAppUA + te.nonBlockingDial = true te.startServer(&testServer{security: te.e.security}) defer te.tearDown() diff --git a/test/end2end_test.go b/test/end2end_test.go index 3d941b187bf..0fe0abb5c57 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -7123,7 +7123,20 @@ func (s) TestGoAwayThenClose(t *testing.T) { // Send GO_AWAY to connection 1. go s1.GracefulStop() - // Wait for connection 2 to be established. + // Wait for the ClientConn to enter IDLE state. + state := cc.GetState() + for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + } + if state != connectivity.Idle { + t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state) + } + + // Initiate another RPC to create another connection. + if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { + t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) + } + + // Assert that connection 2 has been established. <-conn2Established.Done() // Close connection 1. From 71cc75d5877a24f480d1864966d1736536a6d778 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 3 Aug 2021 10:55:24 -0700 Subject: [PATCH 2/5] review comments --- balancer_conn_wrappers.go | 14 +++++++------- clientconn.go | 9 ++++----- pickfirst.go | 17 ++++++++--------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index dd839796397..0ddb24f375f 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -239,17 +239,17 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { return } - ac, err := cc.newAddrConn(addrs, opts) + newAC, err := cc.newAddrConn(addrs, opts) if err != nil { channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) return } - acbw.ac = ac - ac.mu.Lock() - ac.acbw = acbw - ac.mu.Unlock() + acbw.ac = newAC + newAC.mu.Lock() + newAC.acbw = acbw + newAC.mu.Unlock() if acState != connectivity.Idle { - ac.connect() + go newAC.connect() } } } @@ -257,7 +257,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { func (acbw *acBalancerWrapper) Connect() { acbw.mu.Lock() defer acbw.mu.Unlock() - acbw.ac.connect() + go acbw.ac.connect() } func (acbw *acBalancerWrapper) getAddrConn() *addrConn { diff --git a/clientconn.go b/clientconn.go index 7dd4731b20d..d8163f35918 100644 --- a/clientconn.go +++ b/clientconn.go @@ -858,8 +858,7 @@ func (ac *addrConn) connect() error { ac.updateConnectivityState(connectivity.Connecting, nil) ac.mu.Unlock() - // Start a goroutine connecting to the server asynchronously. - go ac.resetTransport() + ac.resetTransport() return nil } @@ -1182,6 +1181,7 @@ func (ac *addrConn) resetTransport() { ac.mu.Unlock() if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { + ac.cc.resolveNow(resolver.ResolveNowOptions{}) // After exhausting all addresses, the addrConn enters // TRANSIENT_FAILURE. ac.mu.Lock() @@ -1207,7 +1207,6 @@ func (ac *addrConn) resetTransport() { timer.Stop() return } - ac.cc.resolveNow(resolver.ResolveNowOptions{}) ac.mu.Lock() if ac.state != connectivity.Shutdown { @@ -1271,8 +1270,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne addr.ServerName = ac.cc.authority } - // Because onPreface may be called before NewClientTransport returns, we - // pass the transport to onPreface via a channel. hctx, hcancel := context.WithCancel(ac.ctx) hcStarted := false // protected by ac.mu @@ -1287,6 +1284,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne } hcancel() ac.transport = nil + // Refresh the name resolver + ac.cc.resolveNow(resolver.ResolveNowOptions{}) if ac.state != connectivity.Shutdown { ac.updateConnectivityState(connectivity.Idle, nil) } diff --git a/pickfirst.go b/pickfirst.go index f188f5c1532..b118e6abc76 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -112,7 +112,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S case connectivity.Connecting: b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) case connectivity.Idle: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &pickerKicker{sc: sc, err: balancer.ErrNoSubConnAvailable}}) + b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}}) case connectivity.TransientFailure: b.cc.UpdateState(balancer.State{ ConnectivityState: s.ConnectivityState, @@ -133,16 +133,15 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return p.result, p.err } -// pickerKicker kicks the SubConn into connecting when Pick is called. -type pickerKicker struct { - sc balancer.SubConn - result balancer.PickResult - err error +// idlePicker is used with the SubConn is IDLE and kicks the SubConn into +// CONNECTING when Pick is called. +type idlePicker struct { + sc balancer.SubConn } -func (pk *pickerKicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - pk.sc.Connect() - return pk.result, pk.err +func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + i.sc.Connect() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } func init() { From 3ab08d98d9ea70a63558b387e97d02e726d6376d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 3 Aug 2021 13:46:51 -0700 Subject: [PATCH 3/5] WHEN --- pickfirst.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pickfirst.go b/pickfirst.go index b118e6abc76..d32161c748d 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -133,7 +133,7 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return p.result, p.err } -// idlePicker is used with the SubConn is IDLE and kicks the SubConn into +// idlePicker is used when the SubConn is IDLE and kicks the SubConn into // CONNECTING when Pick is called. type idlePicker struct { sc balancer.SubConn From c1747631be3d9f0000363852df61ecb5782821ea Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 6 Aug 2021 12:54:27 -0700 Subject: [PATCH 4/5] review comments --- clientconn.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index d8163f35918..fd905ce60a0 100644 --- a/clientconn.go +++ b/clientconn.go @@ -547,11 +547,18 @@ func (cc *ClientConn) GetState() connectivity.State { } // Connect causes all subchannels in the ClientConn to attempt to connect if -// the channel is idle. +// the channel is idle. Does not wait for the connection attempts to begin +// before returning. +// +// Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a later +// release. func (cc *ClientConn) Connect() { if cc.GetState() == connectivity.Idle { cc.mu.Lock() for ac := range cc.conns { + // TODO: should this be a signal to the LB policy instead? go ac.connect() } cc.mu.Unlock() @@ -1262,6 +1269,8 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T // createTransport creates a connection to addr. It returns the transport or an // error. func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { + // TODO: Delete prefaceReceived and move the logic to wait for it into the + // transport. prefaceReceived := grpcsync.NewEvent() connClosed := grpcsync.NewEvent() From c2c64f069cf87c1eb349c7f23b957565ce70503a Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 9 Aug 2021 09:50:58 -0700 Subject: [PATCH 5/5] comments --- clientconn.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/clientconn.go b/clientconn.go index fd905ce60a0..b9e9eed4681 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1228,9 +1228,9 @@ func (ac *addrConn) resetTransport() { ac.mu.Unlock() } -// tryAllAddrs tries to creates a connection to the addresses, and stop when at the -// first successful one. It returns the transport, the address and a Event in -// the successful case. The Event fires when the returned transport disconnects. +// tryAllAddrs tries to creates a connection to the addresses, and stop when at +// the first successful one. It returns an error if no address was successfully +// connected, or updates ac appropriately with the new transport. func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { var firstConnErr error for _, addr := range addrs { @@ -1266,8 +1266,9 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T return firstConnErr } -// createTransport creates a connection to addr. It returns the transport or an -// error. +// createTransport creates a connection to addr. It returns an error if the +// address was not successfully connected, or updates ac appropriately with the +// new transport. func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { // TODO: Delete prefaceReceived and move the logic to wait for it into the // transport.