From 1b4ebf7d2f8beff9ea16235d4fbd227b124ab2f4 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 13 Jan 2020 12:53:47 -0800 Subject: [PATCH] Revert "balancer: move Balancer and Picker to V2; delete legacy API (#3301)" This reverts commit 336cf8d76145dc5ebd517fd9c19e14c6822450b3. --- balancer.go | 391 +++++++++ balancer/balancer.go | 174 ++-- balancer/base/balancer.go | 106 ++- balancer/base/base.go | 37 +- balancer/grpclb/grpclb.go | 15 +- balancer/grpclb/grpclb_picker.go | 2 +- balancer/roundrobin/roundrobin.go | 6 +- balancer_conn_wrappers.go | 37 +- balancer_conn_wrappers_test.go | 39 +- balancer_switching_test.go | 30 +- balancer_test.go | 789 ++++++++++++++++++ balancer_v1_wrapper.go | 334 ++++++++ clientconn.go | 2 + clientconn_state_transition_test.go | 6 +- clientconn_test.go | 127 +++ dialoptions.go | 13 + internal/balancer/stub/stub.go | 94 --- picker_wrapper.go | 48 +- picker_wrapper_test.go | 8 +- pickfirst.go | 23 +- pickfirst_test.go | 14 +- resolver_conn_wrapper_test.go | 7 +- test/balancer_test.go | 264 +----- test/creds_test.go | 6 +- test/end2end_test.go | 16 +- vet.sh | 5 +- .../balancer/cdsbalancer/cdsbalancer.go | 15 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 4 +- .../balancer/edsbalancer/balancergroup.go | 32 +- .../edsbalancer/balancergroup_test.go | 2 +- xds/internal/balancer/edsbalancer/eds.go | 12 + xds/internal/balancer/edsbalancer/eds_impl.go | 9 +- .../balancer/edsbalancer/eds_impl_priority.go | 4 +- .../balancer/edsbalancer/eds_impl_test.go | 16 +- xds/internal/balancer/edsbalancer/eds_test.go | 8 +- .../balancer/edsbalancer/test_util_test.go | 8 +- 36 files changed, 2144 insertions(+), 559 deletions(-) create mode 100644 balancer.go create mode 100644 balancer_test.go create mode 100644 balancer_v1_wrapper.go delete mode 100644 internal/balancer/stub/stub.go diff --git a/balancer.go b/balancer.go new file mode 100644 index 00000000000..a8eb0f47609 --- /dev/null +++ b/balancer.go @@ -0,0 +1,391 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "context" + "net" + "sync" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/naming" + "google.golang.org/grpc/status" +) + +// Address represents a server the client connects to. +// +// Deprecated: please use package balancer. +type Address struct { + // Addr is the server address on which a connection will be established. + Addr string + // Metadata is the information associated with Addr, which may be used + // to make load balancing decision. + Metadata interface{} +} + +// BalancerConfig specifies the configurations for Balancer. +// +// Deprecated: please use package balancer. May be removed in a future 1.x release. +type BalancerConfig struct { + // DialCreds is the transport credential the Balancer implementation can + // use to dial to a remote load balancer server. The Balancer implementations + // can ignore this if it does not need to talk to another party securely. + DialCreds credentials.TransportCredentials + // Dialer is the custom dialer the Balancer implementation can use to dial + // to a remote load balancer server. The Balancer implementations + // can ignore this if it doesn't need to talk to remote balancer. + Dialer func(context.Context, string) (net.Conn, error) +} + +// BalancerGetOptions configures a Get call. +// +// Deprecated: please use package balancer. May be removed in a future 1.x release. +type BalancerGetOptions struct { + // BlockingWait specifies whether Get should block when there is no + // connected address. + BlockingWait bool +} + +// Balancer chooses network addresses for RPCs. +// +// Deprecated: please use package balancer. May be removed in a future 1.x release. +type Balancer interface { + // Start does the initialization work to bootstrap a Balancer. For example, + // this function may start the name resolution and watch the updates. It will + // be called when dialing. + Start(target string, config BalancerConfig) error + // Up informs the Balancer that gRPC has a connection to the server at + // addr. It returns down which is called once the connection to addr gets + // lost or closed. + // TODO: It is not clear how to construct and take advantage of the meaningful error + // parameter for down. Need realistic demands to guide. + Up(addr Address) (down func(error)) + // Get gets the address of a server for the RPC corresponding to ctx. + // i) If it returns a connected address, gRPC internals issues the RPC on the + // connection to this address; + // ii) If it returns an address on which the connection is under construction + // (initiated by Notify(...)) but not connected, gRPC internals + // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or + // Shutdown state; + // or + // * issues RPC on the connection otherwise. + // iii) If it returns an address on which the connection does not exist, gRPC + // internals treats it as an error and will fail the corresponding RPC. + // + // Therefore, the following is the recommended rule when writing a custom Balancer. + // If opts.BlockingWait is true, it should return a connected address or + // block if there is no connected address. It should respect the timeout or + // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast + // RPCs), it should return an address it has notified via Notify(...) immediately + // instead of blocking. + // + // The function returns put which is called once the rpc has completed or failed. + // put can collect and report RPC stats to a remote load balancer. + // + // This function should only return the errors Balancer cannot recover by itself. + // gRPC internals will fail the RPC if an error is returned. + Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) + // Notify returns a channel that is used by gRPC internals to watch the addresses + // gRPC needs to connect. The addresses might be from a name resolver or remote + // load balancer. gRPC internals will compare it with the existing connected + // addresses. If the address Balancer notified is not in the existing connected + // addresses, gRPC starts to connect the address. If an address in the existing + // connected addresses is not in the notification list, the corresponding connection + // is shutdown gracefully. Otherwise, there are no operations to take. Note that + // the Address slice must be the full list of the Addresses which should be connected. + // It is NOT delta. + Notify() <-chan []Address + // Close shuts down the balancer. + Close() error +} + +// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch +// the name resolution updates and updates the addresses available correspondingly. +// +// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release. +func RoundRobin(r naming.Resolver) Balancer { + return &roundRobin{r: r} +} + +type addrInfo struct { + addr Address + connected bool +} + +type roundRobin struct { + r naming.Resolver + w naming.Watcher + addrs []*addrInfo // all the addresses the client should potentially connect + mu sync.Mutex + addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. + next int // index of the next address to return for Get() + waitCh chan struct{} // the channel to block when there is no connected address available + done bool // The Balancer is closed. +} + +func (rr *roundRobin) watchAddrUpdates() error { + updates, err := rr.w.Next() + if err != nil { + grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err) + return err + } + rr.mu.Lock() + defer rr.mu.Unlock() + for _, update := range updates { + addr := Address{ + Addr: update.Addr, + Metadata: update.Metadata, + } + switch update.Op { + case naming.Add: + var exist bool + for _, v := range rr.addrs { + if addr == v.addr { + exist = true + grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr) + break + } + } + if exist { + continue + } + rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) + case naming.Delete: + for i, v := range rr.addrs { + if addr == v.addr { + copy(rr.addrs[i:], rr.addrs[i+1:]) + rr.addrs = rr.addrs[:len(rr.addrs)-1] + break + } + } + default: + grpclog.Errorln("Unknown update.Op ", update.Op) + } + } + // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. + open := make([]Address, len(rr.addrs)) + for i, v := range rr.addrs { + open[i] = v.addr + } + if rr.done { + return ErrClientConnClosing + } + select { + case <-rr.addrCh: + default: + } + rr.addrCh <- open + return nil +} + +func (rr *roundRobin) Start(target string, config BalancerConfig) error { + rr.mu.Lock() + defer rr.mu.Unlock() + if rr.done { + return ErrClientConnClosing + } + if rr.r == nil { + // If there is no name resolver installed, it is not needed to + // do name resolution. In this case, target is added into rr.addrs + // as the only address available and rr.addrCh stays nil. + rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) + return nil + } + w, err := rr.r.Resolve(target) + if err != nil { + return err + } + rr.w = w + rr.addrCh = make(chan []Address, 1) + go func() { + for { + if err := rr.watchAddrUpdates(); err != nil { + return + } + } + }() + return nil +} + +// Up sets the connected state of addr and sends notification if there are pending +// Get() calls. +func (rr *roundRobin) Up(addr Address) func(error) { + rr.mu.Lock() + defer rr.mu.Unlock() + var cnt int + for _, a := range rr.addrs { + if a.addr == addr { + if a.connected { + return nil + } + a.connected = true + } + if a.connected { + cnt++ + } + } + // addr is only one which is connected. Notify the Get() callers who are blocking. + if cnt == 1 && rr.waitCh != nil { + close(rr.waitCh) + rr.waitCh = nil + } + return func(err error) { + rr.down(addr, err) + } +} + +// down unsets the connected state of addr. +func (rr *roundRobin) down(addr Address, err error) { + rr.mu.Lock() + defer rr.mu.Unlock() + for _, a := range rr.addrs { + if addr == a.addr { + a.connected = false + break + } + } +} + +// Get returns the next addr in the rotation. +func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { + var ch chan struct{} + rr.mu.Lock() + if rr.done { + rr.mu.Unlock() + err = ErrClientConnClosing + return + } + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } + } + } + if !opts.BlockingWait { + if len(rr.addrs) == 0 { + rr.mu.Unlock() + err = status.Errorf(codes.Unavailable, "there is no address available") + return + } + // Returns the next addr on rr.addrs for failfast RPCs. + addr = rr.addrs[rr.next].addr + rr.next++ + rr.mu.Unlock() + return + } + // Wait on rr.waitCh for non-failfast RPCs. + if rr.waitCh == nil { + ch = make(chan struct{}) + rr.waitCh = ch + } else { + ch = rr.waitCh + } + rr.mu.Unlock() + for { + select { + case <-ctx.Done(): + err = ctx.Err() + return + case <-ch: + rr.mu.Lock() + if rr.done { + rr.mu.Unlock() + err = ErrClientConnClosing + return + } + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } + } + } + // The newly added addr got removed by Down() again. + if rr.waitCh == nil { + ch = make(chan struct{}) + rr.waitCh = ch + } else { + ch = rr.waitCh + } + rr.mu.Unlock() + } + } +} + +func (rr *roundRobin) Notify() <-chan []Address { + return rr.addrCh +} + +func (rr *roundRobin) Close() error { + rr.mu.Lock() + defer rr.mu.Unlock() + if rr.done { + return errBalancerClosed + } + rr.done = true + if rr.w != nil { + rr.w.Close() + } + if rr.waitCh != nil { + close(rr.waitCh) + rr.waitCh = nil + } + if rr.addrCh != nil { + close(rr.addrCh) + } + return nil +} + +// pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn. +// It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get() +// returns the only address Up by resetTransport(). +type pickFirst struct { + *roundRobin +} diff --git a/balancer/balancer.go b/balancer/balancer.go index 6266981a938..531a174a839 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/status" ) var ( @@ -124,7 +123,7 @@ type State struct { // determine the state of the ClientConn. ConnectivityState connectivity.State // Picker is used to choose connections (SubConns) for RPCs. - Picker Picker + Picker V2Picker } // ClientConn represents a gRPC ClientConn. @@ -142,11 +141,20 @@ type ClientConn interface { // The SubConn will be shutdown. RemoveSubConn(SubConn) + // UpdateBalancerState is called by balancer to notify gRPC that some internal + // state in balancer has changed. + // + // gRPC will update the connectivity state of the ClientConn, and will call pick + // on the new picker to pick new SubConn. + // + // Deprecated: use UpdateState instead + UpdateBalancerState(s connectivity.State, p Picker) + // UpdateState notifies gRPC that the balancer's internal state has // changed. // - // gRPC will update the connectivity state of the ClientConn, and will call - // pick on the new picker to pick new SubConns. + // gRPC will update the connectivity state of the ClientConn, and will call pick + // on the new picker to pick new SubConns. UpdateState(State) // ResolveNow is called by balancer to notify gRPC to do a name resolving. @@ -229,13 +237,56 @@ type DoneInfo struct { var ( // ErrNoSubConnAvailable indicates no SubConn is available for pick(). - // gRPC will block the RPC until a new picker is available via UpdateState(). + // gRPC will block the RPC until a new picker is available via UpdateBalancerState(). ErrNoSubConnAvailable = errors.New("no SubConn is available") // ErrTransientFailure indicates all SubConns are in TransientFailure. // WaitForReady RPCs will block, non-WaitForReady RPCs will fail. - ErrTransientFailure = errors.New("all SubConns are in TransientFailure") + ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure")) ) +// Picker is used by gRPC to pick a SubConn to send an RPC. +// Balancer is expected to generate a new picker from its snapshot every time its +// internal state has changed. +// +// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). +// +// Deprecated: use V2Picker instead +type Picker interface { + // Pick returns the SubConn to be used to send the RPC. + // The returned SubConn must be one returned by NewSubConn(). + // + // This functions is expected to return: + // - a SubConn that is known to be READY; + // - ErrNoSubConnAvailable if no SubConn is available, but progress is being + // made (for example, some SubConn is in CONNECTING mode); + // - other errors if no active connecting is happening (for example, all SubConn + // are in TRANSIENT_FAILURE mode). + // + // If a SubConn is returned: + // - If it is READY, gRPC will send the RPC on it; + // - If it is not ready, or becomes not ready after it's returned, gRPC will + // block until UpdateBalancerState() is called and will call pick on the + // new picker. The done function returned from Pick(), if not nil, will be + // called with nil error, no bytes sent and no bytes received. + // + // If the returned error is not nil: + // - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState() + // - If the error is ErrTransientFailure or implements IsTransientFailure() + // bool, returning true: + // - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState() + // is called to pick again; + // - Otherwise, RPC will fail with unavailable error. + // - Else (error is other non-nil error): + // - The RPC will fail with the error's status code, or Unknown if it is + // not a status error. + // + // The returned done() function will be called once the rpc has finished, + // with the final status of that RPC. If the SubConn returned is not a + // valid SubConn type, done may not be called. done may be nil if balancer + // doesn't care about the RPC status. + Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error) +} + // PickResult contains information related to a connection chosen for an RPC. type PickResult struct { // SubConn is the connection to use for this pick, if its state is Ready. @@ -251,36 +302,24 @@ type PickResult struct { Done func(DoneInfo) } -type dropRPCError struct { +type transientFailureError struct { error - status *status.Status } -func (e *dropRPCError) DropRPC() bool { return true } - -func (e *dropRPCError) GRPCStatus() *status.Status { return e.status } +func (e *transientFailureError) IsTransientFailure() bool { return true } -// DropRPCError wraps err in an error implementing DropRPC() bool, returning -// true. If err is not a status error, it will be converted to one with code -// Unknown and the message containing the err.Error() result. -func DropRPCError(err error) error { - st := status.Convert(err) - return &dropRPCError{error: st.Err(), status: st} +// TransientFailureError wraps err in an error implementing +// IsTransientFailure() bool, returning true. +func TransientFailureError(err error) error { + return &transientFailureError{error: err} } -// TransientFailureError returns e. It exists for backward compatibility and -// will be deleted soon. -// -// Deprecated: no longer necessary, picker errors are treated this way by -// default. -func TransientFailureError(e error) error { return e } - -// Picker is used by gRPC to pick a SubConn to send an RPC. +// V2Picker is used by gRPC to pick a SubConn to send an RPC. // Balancer is expected to generate a new picker from its snapshot every time its // internal state has changed. // -// The pickers used by gRPC can be updated by ClientConn.UpdateState(). -type Picker interface { +// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState(). +type V2Picker interface { // Pick returns the connection to use for this RPC and related information. // // Pick should not block. If the balancer needs to do I/O or any blocking @@ -293,14 +332,14 @@ type Picker interface { // - If the error is ErrNoSubConnAvailable, gRPC will block until a new // Picker is provided by the balancer (using ClientConn.UpdateState). // - // - If the error implements DropRPC() bool, returning true, gRPC will - // terminate all RPCs with the code and message provided. If the error - // is not a status error, it will be converted by gRPC to a status error - // with code Unknown. + // - If the error implements IsTransientFailure() bool, returning true, + // wait for ready RPCs will wait, but non-wait for ready RPCs will be + // terminated with this error's Error() string and status code + // Unavailable. // - // - For all other errors, wait for ready RPCs will wait, but non-wait for - // ready RPCs will be terminated with this error's Error() string and - // status code Unavailable. + // - Any other errors terminate all RPCs with the code and message + // provided. If the error is not a status error, it will be converted by + // gRPC to a status error with code Unknown. Pick(info PickInfo) (PickResult, error) } @@ -309,36 +348,34 @@ type Picker interface { // // It also generates and updates the Picker used by gRPC to pick SubConns for RPCs. // -// UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are -// guaranteed to be called synchronously from the same goroutine. There's no -// guarantee on picker.Pick, it may be called anytime. +// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed +// to be called synchronously from the same goroutine. +// There's no guarantee on picker.Pick, it may be called anytime. type Balancer interface { - // UpdateClientConnState is called by gRPC when the state of the ClientConn - // changes. If the error returned is ErrBadResolverState, the ClientConn - // will begin calling ResolveNow on the active name resolver with - // exponential backoff until a subsequent call to UpdateClientConnState - // returns a nil error. Any other errors are currently ignored. - UpdateClientConnState(ClientConnState) error - // ResolverError is called by gRPC when the name resolver reports an error. - ResolverError(error) - // UpdateSubConnState is called by gRPC when the state of a SubConn - // changes. - UpdateSubConnState(SubConn, SubConnState) + // HandleSubConnStateChange is called by gRPC when the connectivity state + // of sc has changed. + // Balancer is expected to aggregate all the state of SubConn and report + // that back to gRPC. + // Balancer should also generate and update Pickers when its internal state has + // been changed by the new state. + // + // Deprecated: if V2Balancer is implemented by the Balancer, + // UpdateSubConnState will be called instead. + HandleSubConnStateChange(sc SubConn, state connectivity.State) + // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to + // balancers. + // Balancer can create new SubConn or remove SubConn with the addresses. + // An empty address slice and a non-nil error will be passed if the resolver returns + // non-nil error to gRPC. + // + // Deprecated: if V2Balancer is implemented by the Balancer, + // UpdateClientConnState will be called instead. + HandleResolvedAddrs([]resolver.Address, error) // Close closes the balancer. The balancer is not required to call // ClientConn.RemoveSubConn for its existing SubConns. Close() } -// V2Balancer is temporarily defined for backward compatibility reasons. -// -// Deprecated: use Balancer directly instead. -type V2Balancer = Balancer - -// V2Picker is temporarily defined for backward compatibility reasons. -// -// Deprecated: use Picker directly instead. -type V2Picker = Picker - // SubConnState describes the state of a SubConn. type SubConnState struct { // ConnectivityState is the connectivity state of the SubConn. @@ -361,6 +398,27 @@ type ClientConnState struct { // problem with the provided name resolver data. var ErrBadResolverState = errors.New("bad resolver state") +// V2Balancer is defined for documentation purposes. If a Balancer also +// implements V2Balancer, its UpdateClientConnState method will be called +// instead of HandleResolvedAddrs and its UpdateSubConnState will be called +// instead of HandleSubConnStateChange. +type V2Balancer interface { + // UpdateClientConnState is called by gRPC when the state of the ClientConn + // changes. If the error returned is ErrBadResolverState, the ClientConn + // will begin calling ResolveNow on the active name resolver with + // exponential backoff until a subsequent call to UpdateClientConnState + // returns a nil error. Any other errors are currently ignored. + UpdateClientConnState(ClientConnState) error + // ResolverError is called by gRPC when the name resolver reports an error. + ResolverError(error) + // UpdateSubConnState is called by gRPC when the state of a SubConn + // changes. + UpdateSubConnState(SubConn, SubConnState) + // Close closes the balancer. The balancer is not required to call + // ClientConn.RemoveSubConn for its existing SubConns. + Close() +} + // ConnectivityStateEvaluator takes the connectivity states of multiple SubConns // and returns one aggregated connectivity state. // diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index ffe7149fa0a..d952f09f345 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -19,6 +19,7 @@ package base import ( + "context" "errors" "google.golang.org/grpc/balancer" @@ -28,15 +29,17 @@ import ( ) type baseBuilder struct { - name string - pickerBuilder PickerBuilder - config Config + name string + pickerBuilder PickerBuilder + v2PickerBuilder V2PickerBuilder + config Config } func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { bal := &baseBalancer{ - cc: cc, - pickerBuilder: bb.pickerBuilder, + cc: cc, + pickerBuilder: bb.pickerBuilder, + v2PickerBuilder: bb.v2PickerBuilder, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), @@ -48,6 +51,8 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) // may call UpdateState with this picker. if bb.pickerBuilder != nil { bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable) + } else { + bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable) } return bal } @@ -56,9 +61,12 @@ func (bb *baseBuilder) Name() string { return bb.name } +var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer + type baseBalancer struct { - cc balancer.ClientConn - pickerBuilder PickerBuilder + cc balancer.ClientConn + pickerBuilder PickerBuilder + v2PickerBuilder V2PickerBuilder csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State @@ -66,19 +74,27 @@ type baseBalancer struct { subConns map[resolver.Address]balancer.SubConn scStates map[balancer.SubConn]connectivity.State picker balancer.Picker + v2Picker balancer.V2Picker config Config } +func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + panic("not implemented") +} + func (b *baseBalancer) ResolverError(err error) { switch b.state { case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: if b.picker != nil { b.picker = NewErrPicker(err) + } else { + b.v2Picker = NewErrPickerV2(err) } } } func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + // TODO: handle s.ResolverState.Err (log if not nil) once implemented. // TODO: handle s.ResolverState.ServiceConfig? if grpclog.V(2) { grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s) @@ -105,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.cc.RemoveSubConn(sc) delete(b.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. - // The entry will be deleted in UpdateSubConnState. + // The entry will be deleted in HandleSubConnStateChange. } } return nil @@ -117,26 +133,46 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // - built by the pickerBuilder with all READY SubConns otherwise. func (b *baseBalancer) regeneratePicker(err error) { if b.state == connectivity.TransientFailure { - if err != nil { - b.picker = NewErrPicker(err) + if b.pickerBuilder != nil { + b.picker = NewErrPicker(balancer.ErrTransientFailure) } else { - // This means the last subchannel transition was not to - // TransientFailure (otherwise err must be set), but the - // aggregate state of the balancer is TransientFailure, meaning - // there are no other addresses. - b.picker = NewErrPicker(errors.New("resolver returned no addresses")) + if err != nil { + b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err)) + } else { + // This means the last subchannel transition was not to + // TransientFailure (otherwise err must be set), but the + // aggregate state of the balancer is TransientFailure, meaning + // there are no other addresses. + b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses"))) + } } return } - readySCs := make(map[balancer.SubConn]SubConnInfo) + if b.pickerBuilder != nil { + readySCs := make(map[resolver.Address]balancer.SubConn) - // Filter out all ready SCs from full subConn map. - for addr, sc := range b.subConns { - if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { - readySCs[sc] = SubConnInfo{Address: addr} + // Filter out all ready SCs from full subConn map. + for addr, sc := range b.subConns { + if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { + readySCs[addr] = sc + } } + b.picker = b.pickerBuilder.Build(readySCs) + } else { + readySCs := make(map[balancer.SubConn]SubConnInfo) + + // Filter out all ready SCs from full subConn map. + for addr, sc := range b.subConns { + if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { + readySCs[sc] = SubConnInfo{Address: addr} + } + } + b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) } - b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) +} + +func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + panic("not implemented") } func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { @@ -174,7 +210,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su b.regeneratePicker(state.ConnectionError) } - b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + if b.picker != nil { + b.cc.UpdateBalancerState(b.state, b.picker) + } else { + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker}) + } } // Close is a nop because base balancer doesn't have internal state to clean up, @@ -182,20 +222,28 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su func (b *baseBalancer) Close() { } -// NewErrPicker returns a Picker that always returns err on Pick(). +// NewErrPicker returns a picker that always returns err on Pick(). func NewErrPicker(err error) balancer.Picker { return &errPicker{err: err} } -// NewErrPickerV2 is temporarily defined for backward compatibility reasons. -// -// Deprecated: use NewErrPicker instead. -var NewErrPickerV2 = NewErrPicker - type errPicker struct { err error // Pick() always returns this err. } -func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { + return nil, nil, p.err +} + +// NewErrPickerV2 returns a V2Picker that always returns err on Pick(). +func NewErrPickerV2(err error) balancer.V2Picker { + return &errPickerV2{err: err} +} + +type errPickerV2 struct { + err error // Pick() always returns this err. +} + +func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return balancer.PickResult{}, p.err } diff --git a/balancer/base/base.go b/balancer/base/base.go index c4fc89111bf..4192918b9e2 100644 --- a/balancer/base/base.go +++ b/balancer/base/base.go @@ -37,8 +37,15 @@ import ( // PickerBuilder creates balancer.Picker. type PickerBuilder interface { + // Build takes a slice of ready SubConns, and returns a picker that will be + // used by gRPC to pick a SubConn. + Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker +} + +// V2PickerBuilder creates balancer.V2Picker. +type V2PickerBuilder interface { // Build returns a picker that will be used by gRPC to pick a SubConn. - Build(info PickerBuildInfo) balancer.Picker + Build(info PickerBuildInfo) balancer.V2Picker } // PickerBuildInfo contains information needed by the picker builder to @@ -55,14 +62,20 @@ type SubConnInfo struct { Address resolver.Address // the address used to create this SubConn } +// NewBalancerBuilder returns a balancer builder. The balancers +// built by this builder will use the picker builder to build pickers. +func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder { + return NewBalancerBuilderWithConfig(name, pb, Config{}) +} + // Config contains the config info about the base balancer builder. type Config struct { // HealthCheck indicates whether health checking should be enabled for this specific balancer. HealthCheck bool } -// NewBalancerBuilder returns a base balancer builder configured by the provided config. -func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder { +// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config. +func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder { return &baseBuilder{ name: name, pickerBuilder: pb, @@ -70,13 +83,11 @@ func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.B } } -// NewBalancerBuilderV2 is temporarily defined for backward compatibility -// reasons. -// -// Deprecated: use NewBalancerBuilder instead. -var NewBalancerBuilderV2 = NewBalancerBuilder - -// V2PickerBuilder is temporarily defined for backward compatibility reasons. -// -// Deprecated: use PickerBuilder instead. -type V2PickerBuilder = PickerBuilder +// NewBalancerBuilderV2 returns a base balancer builder configured by the provided config. +func NewBalancerBuilderV2(name string, pb V2PickerBuilder, config Config) balancer.Builder { + return &baseBuilder{ + name: name, + v2PickerBuilder: pb, + config: config, + } +} diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 77ad01aad61..1e34786834b 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -163,6 +163,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal return lb } +var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer + type lbBalancer struct { cc *lbCacheClientConn target string @@ -212,7 +214,7 @@ type lbBalancer struct { state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns. - picker balancer.Picker + picker balancer.V2Picker // Support fallback to resolved backend addresses if there's no response // from remote balancer within fallbackTimeout. remoteBalancerConnected bool @@ -310,6 +312,10 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { return connectivity.TransientFailure } +func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + panic("not used") +} + func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { s := scs.ConnectivityState if grpclog.V(2) { @@ -387,6 +393,13 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { lb.mu.Unlock() } +// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB +// clientConn. The remoteLB clientConn will handle creating/removing remoteLB +// connections. +func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + panic("not used") +} + func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) { lb.mu.Lock() defer lb.mu.Unlock() diff --git a/balancer/grpclb/grpclb_picker.go b/balancer/grpclb/grpclb_picker.go index 59f871d7814..39bc5cc71e8 100644 --- a/balancer/grpclb/grpclb_picker.go +++ b/balancer/grpclb/grpclb_picker.go @@ -172,7 +172,7 @@ func (p *lbPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { // If it's a drop, return an error and fail the RPC. if s.Drop { p.stats.drop(s.LoadBalanceToken) - return balancer.PickResult{}, balancer.DropRPCError(status.Errorf(codes.Unavailable, "request dropped by grpclb")) + return balancer.PickResult{}, status.Errorf(codes.Unavailable, "request dropped by grpclb") } // If not a drop but there's no ready subConns. diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index a02b372cf20..d4d645501c1 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -35,7 +35,7 @@ const Name = "round_robin" // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true}) } func init() { @@ -44,10 +44,10 @@ func init() { type rrPickerBuilder struct{} -func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { +func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker { grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info) if len(info.ReadySCs) == 0 { - return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable) } var scs []balancer.SubConn for sc := range info.ReadySCs { diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index bc594f75732..824f28e740a 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -74,7 +74,11 @@ func (ccb *ccBalancerWrapper) watcher() { } ccb.balancerMu.Lock() su := t.(*scStateUpdate) - ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) + if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { + ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) + } else { + ccb.balancer.HandleSubConnStateChange(su.sc, su.state) + } ccb.balancerMu.Unlock() case <-ccb.done.Done(): } @@ -119,13 +123,19 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { ccb.balancerMu.Lock() defer ccb.balancerMu.Unlock() - return ccb.balancer.UpdateClientConnState(*ccs) + if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { + return ub.UpdateClientConnState(*ccs) + } + ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil) + return nil } func (ccb *ccBalancerWrapper) resolverError(err error) { - ccb.balancerMu.Lock() - ccb.balancer.ResolverError(err) - ccb.balancerMu.Unlock() + if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { + ccb.balancerMu.Lock() + ub.ResolverError(err) + ccb.balancerMu.Unlock() + } } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { @@ -163,6 +173,21 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } +func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + ccb.mu.Lock() + defer ccb.mu.Unlock() + if ccb.subConns == nil { + return + } + // Update picker before updating state. Even though the ordering here does + // not matter, it can lead to multiple calls of Pick in the common start-up + // case where we wait for ready and then perform an RPC. If the picker is + // updated later, we could call the "connecting" picker when the state is + // updated, and then call the "ready" picker after the picker gets updated. + ccb.cc.blockingpicker.updatePicker(p) + ccb.cc.csMgr.updateState(s) +} + func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { ccb.mu.Lock() defer ccb.mu.Unlock() @@ -174,7 +199,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. - ccb.cc.blockingpicker.updatePicker(s.Picker) + ccb.cc.blockingpicker.updatePickerV2(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } diff --git a/balancer_conn_wrappers_test.go b/balancer_conn_wrappers_test.go index 2e79d626ea5..a29a7dd8fad 100644 --- a/balancer_conn_wrappers_test.go +++ b/balancer_conn_wrappers_test.go @@ -23,19 +23,50 @@ import ( "testing" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" ) +var _ balancer.V2Balancer = &funcBalancer{} + +type funcBalancer struct { + updateClientConnState func(s balancer.ClientConnState) error +} + +func (*funcBalancer) HandleSubConnStateChange(balancer.SubConn, connectivity.State) { + panic("unimplemented") // v1 API +} +func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) { + panic("unimplemented") // v1 API +} +func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + return b.updateClientConnState(s) +} +func (*funcBalancer) ResolverError(error) {} +func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { + panic("unimplemented") // we never have sub-conns +} +func (*funcBalancer) Close() {} + +type funcBalancerBuilder struct { + name string + instance *funcBalancer +} + +func (b *funcBalancerBuilder) Build(balancer.ClientConn, balancer.BuildOptions) balancer.Balancer { + return b.instance +} +func (b *funcBalancerBuilder) Name() string { return b.name } + // TestBalancerErrorResolverPolling injects balancer errors and verifies // ResolveNow is called on the resolver with the appropriate backoff strategy // being consulted between ResolveNow calls. func (s) TestBalancerErrorResolverPolling(t *testing.T) { // The test balancer will return ErrBadResolverState iff the // ClientConnState contains no addresses. - bf := stub.BalancerFuncs{ - UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error { + fb := &funcBalancer{ + updateClientConnState: func(s balancer.ClientConnState) error { if len(s.ResolverState.Addresses) == 0 { return balancer.ErrBadResolverState } @@ -43,7 +74,7 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) { }, } const balName = "BalancerErrorResolverPolling" - stub.Register(balName, bf) + balancer.Register(&funcBalancerBuilder{name: balName, instance: fb}) testResolverErrorPolling(t, func(r *manual.Resolver) { diff --git a/balancer_switching_test.go b/balancer_switching_test.go index f47754bfdfe..1ccbaba6cdc 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -47,13 +48,9 @@ func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) ba return b } -func (b *magicalLB) ResolverError(error) {} +func (b *magicalLB) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {} -func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {} - -func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error { - return nil -} +func (b *magicalLB) HandleResolvedAddrs([]resolver.Address, error) {} func (b *magicalLB) Close() {} @@ -61,21 +58,6 @@ func init() { balancer.Register(&magicalLB{}) } -func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) { - var servers []*server - for i := 0; i < numServers; i++ { - s := newTestServer() - servers = append(servers, s) - go s.start(t, 0, maxStreams) - s.wait(t, 2*time.Second) - } - return servers, func() { - for i := 0; i < numServers; i++ { - servers[i].stop() - } - } -} - func checkPickFirst(cc *ClientConn, servers []*server) error { var ( req = "port" @@ -151,7 +133,7 @@ func (s) TestSwitchBalancer(t *testing.T) { defer rcleanup() const numServers = 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -183,7 +165,7 @@ func (s) TestBalancerDialOption(t *testing.T) { defer rcleanup() const numServers = 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) @@ -499,7 +481,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { defer rcleanup() const numServers = 3 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 00000000000..524fd43a6f5 --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,789 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "context" + "fmt" + "math" + "strconv" + "sync" + "testing" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/naming" + "google.golang.org/grpc/status" +) + +func pickFirstBalancerV1(r naming.Resolver) Balancer { + return &pickFirst{&roundRobin{r: r}} +} + +type testWatcher struct { + // the channel to receives name resolution updates + update chan *naming.Update + // the side channel to get to know how many updates in a batch + side chan int + // the channel to notify update injector that the update reading is done + readDone chan int +} + +func (w *testWatcher) Next() (updates []*naming.Update, err error) { + n := <-w.side + if n == 0 { + return nil, fmt.Errorf("w.side is closed") + } + for i := 0; i < n; i++ { + u := <-w.update + if u != nil { + updates = append(updates, u) + } + } + w.readDone <- 0 + return +} + +func (w *testWatcher) Close() { + close(w.side) +} + +// Inject naming resolution updates to the testWatcher. +func (w *testWatcher) inject(updates []*naming.Update) { + w.side <- len(updates) + for _, u := range updates { + w.update <- u + } + <-w.readDone +} + +type testNameResolver struct { + w *testWatcher + addr string +} + +func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) { + r.w = &testWatcher{ + update: make(chan *naming.Update, 1), + side: make(chan int, 1), + readDone: make(chan int), + } + r.w.side <- 1 + r.w.update <- &naming.Update{ + Op: naming.Add, + Addr: r.addr, + } + go func() { + <-r.w.readDone + }() + return r.w, nil +} + +func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) { + var servers []*server + for i := 0; i < numServers; i++ { + s := newTestServer() + servers = append(servers, s) + go s.start(t, 0, maxStreams) + s.wait(t, 2*time.Second) + } + // Point to server[0] + addr := "localhost:" + servers[0].port + return servers, &testNameResolver{ + addr: addr, + }, func() { + for i := 0; i < numServers; i++ { + servers[i].stop() + } + } +} + +func (s) TestNameDiscovery(t *testing.T) { + // Start 2 servers on 2 ports. + numServers := 2 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + req := "port" + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) + } + // Inject the name resolution change to remove servers[0] and add servers[1]. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }) + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + }) + r.w.inject(updates) + // Loop until the rpcs in flight talks to servers[1]. + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } +} + +func (s) TestEmptyAddrs(t *testing.T) { + servers, r, cleanup := startServers(t, 1, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, ", err, reply, expectedResponse) + } + // Inject name resolution change to remove the server so that there is no address + // available after that. + u := &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + } + r.w.inject([]*naming.Update{u}) + // Loop until the above updates apply. + for { + time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { + cancel() + break + } + cancel() + } +} + +func (s) TestRoundRobin(t *testing.T) { + // Start 3 servers on 3 ports. + numServers := 3 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] to the service discovery. + u := &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + } + r.w.inject([]*naming.Update{u}) + req := "port" + var reply string + // Loop until servers[1] is up + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + // Add server2[2] to the service discovery. + u = &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[2].port, + } + r.w.inject([]*naming.Update{u}) + // Loop until both servers[2] are up. + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { + break + } + time.Sleep(10 * time.Millisecond) + } + // Check the incoming RPCs served in a round-robin manner. + for i := 0; i < 10; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port) + } + } +} + +func (s) TestCloseWithPendingRPC(t *testing.T) { + servers, r, cleanup := startServers(t, 1, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) + } + // Remove the server. + updates := []*naming.Update{{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }} + r.w.inject(updates) + // Loop until the above update applies. + for { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { + cancel() + break + } + time.Sleep(10 * time.Millisecond) + cancel() + } + // Issue 2 RPCs which should be completed with error status once cc is closed. + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) + } + }() + go func() { + defer wg.Done() + var reply string + time.Sleep(5 * time.Millisecond) + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) + } + }() + time.Sleep(5 * time.Millisecond) + cc.Close() + wg.Wait() +} + +func (s) TestGetOnWaitChannel(t *testing.T) { + servers, r, cleanup := startServers(t, 1, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Remove all servers so that all upcoming RPCs will block on waitCh. + updates := []*naming.Update{{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }} + r.w.inject(updates) + for { + var reply string + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { + cancel() + break + } + cancel() + time.Sleep(10 * time.Millisecond) + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) + } + }() + // Add a connected server to get the above RPC through. + updates = []*naming.Update{{ + Op: naming.Add, + Addr: "localhost:" + servers[0].port, + }} + r.w.inject(updates) + // Wait until the above RPC succeeds. + wg.Wait() +} + +func (s) TestOneServerDown(t *testing.T) { + // Start 2 servers. + numServers := 2 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] to the service discovery. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + }) + r.w.inject(updates) + req := "port" + var reply string + // Loop until servers[1] is up + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + + var wg sync.WaitGroup + numRPC := 100 + sleepDuration := 10 * time.Millisecond + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, kill server[0]. + servers[0].stop() + wg.Done() + }() + + // All non-failfast RPCs should not block because there's at least one connection available. + for i := 0; i < numRPC; i++ { + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, invoke RPC. + // server[0] is killed around the same time to make it racy between balancer and gRPC internals. + cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true)) + wg.Done() + }() + } + wg.Wait() +} + +func (s) TestOneAddressRemoval(t *testing.T) { + // Start 2 servers. + numServers := 2 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] to the service discovery. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + }) + r.w.inject(updates) + req := "port" + var reply string + // Loop until servers[1] is up + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + + var wg sync.WaitGroup + numRPC := 100 + sleepDuration := 10 * time.Millisecond + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, delete server[0]. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }) + r.w.inject(updates) + wg.Done() + }() + + // All non-failfast RPCs should not fail because there's at least one connection available. + for i := 0; i < numRPC; i++ { + wg.Add(1) + go func() { + var reply string + time.Sleep(sleepDuration) + // After sleepDuration, invoke RPC. + // server[0] is removed around the same time to make it racy between balancer and gRPC internals. + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) + } + wg.Done() + }() + } + wg.Wait() +} + +func checkServerUp(t *testing.T, currentServer *server) { + req := "port" + port := currentServer.port + cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + var reply string + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port { + break + } + time.Sleep(10 * time.Millisecond) + } +} + +func (s) TestPickFirstEmptyAddrs(t *testing.T) { + servers, r, cleanup := startServers(t, 1, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, ", err, reply, expectedResponse) + } + // Inject name resolution change to remove the server so that there is no address + // available after that. + u := &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + } + r.w.inject([]*naming.Update{u}) + // Loop until the above updates apply. + for { + time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil { + cancel() + break + } + cancel() + } +} + +func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) { + servers, r, cleanup := startServers(t, 1, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { + t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) + } + // Remove the server. + updates := []*naming.Update{{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }} + r.w.inject(updates) + // Loop until the above update applies. + for { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded { + cancel() + break + } + time.Sleep(10 * time.Millisecond) + cancel() + } + // Issue 2 RPCs which should be completed with error status once cc is closed. + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + var reply string + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) + } + }() + go func() { + defer wg.Done() + var reply string + time.Sleep(5 * time.Millisecond) + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) + } + }() + time.Sleep(5 * time.Millisecond) + cc.Close() + wg.Wait() +} + +func (s) TestPickFirstOrderAllServerUp(t *testing.T) { + // Start 3 servers on 3 ports. + numServers := 3 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] and [2] to the service discovery. + u := &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + } + r.w.inject([]*naming.Update{u}) + + u = &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[2].port, + } + r.w.inject([]*naming.Update{u}) + + // Loop until all 3 servers are up + checkServerUp(t, servers[0]) + checkServerUp(t, servers[1]) + checkServerUp(t, servers[2]) + + // Check the incoming RPCs served in server[0] + req := "port" + var reply string + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) + } + time.Sleep(10 * time.Millisecond) + } + + // Delete server[0] in the balancer, the incoming RPCs served in server[1] + // For test addrconn, close server[0] instead + u = &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + } + r.w.inject([]*naming.Update{u}) + // Loop until it changes to server[1] + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) + } + time.Sleep(10 * time.Millisecond) + } + + // Add server[0] back to the balancer, the incoming RPCs served in server[1] + // Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port} + u = &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[0].port, + } + r.w.inject([]*naming.Update{u}) + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) + } + time.Sleep(10 * time.Millisecond) + } + + // Delete server[1] in the balancer, the incoming RPCs served in server[2] + u = &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[1].port, + } + r.w.inject([]*naming.Update{u}) + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port { + break + } + time.Sleep(1 * time.Second) + } + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port) + } + time.Sleep(10 * time.Millisecond) + } + + // Delete server[2] in the balancer, the incoming RPCs served in server[0] + u = &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[2].port, + } + r.w.inject([]*naming.Update{u}) + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { + break + } + time.Sleep(1 * time.Second) + } + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) + } + time.Sleep(10 * time.Millisecond) + } +} + +func (s) TestPickFirstOrderOneServerDown(t *testing.T) { + // Start 3 servers on 3 ports. + numServers := 3 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] and [2] to the service discovery. + u := &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + } + r.w.inject([]*naming.Update{u}) + + u = &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[2].port, + } + r.w.inject([]*naming.Update{u}) + + // Loop until all 3 servers are up + checkServerUp(t, servers[0]) + checkServerUp(t, servers[1]) + checkServerUp(t, servers[2]) + + // Check the incoming RPCs served in server[0] + req := "port" + var reply string + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) + } + time.Sleep(10 * time.Millisecond) + } + + // server[0] down, incoming RPCs served in server[1], but the order of Notify still remains + // {server[0] server[1] server[2]} + servers[0].stop() + // Loop until it changes to server[1] + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) + } + time.Sleep(10 * time.Millisecond) + } + + // up the server[0] back, the incoming RPCs served in server[1] + p, _ := strconv.Atoi(servers[0].port) + servers[0] = newTestServer() + go servers[0].start(t, p, math.MaxUint32) + defer servers[0].stop() + servers[0].wait(t, 2*time.Second) + checkServerUp(t, servers[0]) + + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port) + } + time.Sleep(10 * time.Millisecond) + } + + // Delete server[1] in the balancer, the incoming RPCs served in server[0] + u = &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[1].port, + } + r.w.inject([]*naming.Update{u}) + for { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port { + break + } + time.Sleep(1 * time.Second) + } + for i := 0; i < 20; i++ { + if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port { + t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port) + } + time.Sleep(10 * time.Millisecond) + } +} + +func (s) TestPickFirstOneAddressRemoval(t *testing.T) { + // Start 2 servers. + numServers := 2 + servers, r, cleanup := startServers(t, numServers, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + defer cc.Close() + // Add servers[1] to the service discovery. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + }) + r.w.inject(updates) + + // Create a new cc to Loop until servers[1] is up + checkServerUp(t, servers[0]) + checkServerUp(t, servers[1]) + + var wg sync.WaitGroup + numRPC := 100 + sleepDuration := 10 * time.Millisecond + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, delete server[0]. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }) + r.w.inject(updates) + wg.Done() + }() + + // All non-failfast RPCs should not fail because there's at least one connection available. + for i := 0; i < numRPC; i++ { + wg.Add(1) + go func() { + var reply string + time.Sleep(sleepDuration) + // After sleepDuration, invoke RPC. + // server[0] is removed around the same time to make it racy between balancer and gRPC internals. + if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil { + t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err) + } + wg.Done() + }() + } + wg.Wait() +} diff --git a/balancer_v1_wrapper.go b/balancer_v1_wrapper.go new file mode 100644 index 00000000000..db04b08b842 --- /dev/null +++ b/balancer_v1_wrapper.go @@ -0,0 +1,334 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "sync" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +type balancerWrapperBuilder struct { + b Balancer // The v1 balancer. +} + +func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + bwb.b.Start(opts.Target.Endpoint, BalancerConfig{ + DialCreds: opts.DialCreds, + Dialer: opts.Dialer, + }) + _, pickfirst := bwb.b.(*pickFirst) + bw := &balancerWrapper{ + balancer: bwb.b, + pickfirst: pickfirst, + cc: cc, + targetAddr: opts.Target.Endpoint, + startCh: make(chan struct{}), + conns: make(map[resolver.Address]balancer.SubConn), + connSt: make(map[balancer.SubConn]*scState), + csEvltr: &balancer.ConnectivityStateEvaluator{}, + state: connectivity.Idle, + } + cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: bw}) + go bw.lbWatcher() + return bw +} + +func (bwb *balancerWrapperBuilder) Name() string { + return "wrapper" +} + +type scState struct { + addr Address // The v1 address type. + s connectivity.State + down func(error) +} + +type balancerWrapper struct { + balancer Balancer // The v1 balancer. + pickfirst bool + + cc balancer.ClientConn + targetAddr string // Target without the scheme. + + mu sync.Mutex + conns map[resolver.Address]balancer.SubConn + connSt map[balancer.SubConn]*scState + // This channel is closed when handling the first resolver result. + // lbWatcher blocks until this is closed, to avoid race between + // - NewSubConn is created, cc wants to notify balancer of state changes; + // - Build hasn't return, cc doesn't have access to balancer. + startCh chan struct{} + + // To aggregate the connectivity state. + csEvltr *balancer.ConnectivityStateEvaluator + state connectivity.State +} + +// lbWatcher watches the Notify channel of the balancer and manages +// connections accordingly. +func (bw *balancerWrapper) lbWatcher() { + <-bw.startCh + notifyCh := bw.balancer.Notify() + if notifyCh == nil { + // There's no resolver in the balancer. Connect directly. + a := resolver.Address{ + Addr: bw.targetAddr, + Type: resolver.Backend, + } + sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) + } else { + bw.mu.Lock() + bw.conns[a] = sc + bw.connSt[sc] = &scState{ + addr: Address{Addr: bw.targetAddr}, + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + return + } + + for addrs := range notifyCh { + grpclog.Infof("balancerWrapper: got update addr from Notify: %v", addrs) + if bw.pickfirst { + var ( + oldA resolver.Address + oldSC balancer.SubConn + ) + bw.mu.Lock() + for oldA, oldSC = range bw.conns { + break + } + bw.mu.Unlock() + if len(addrs) <= 0 { + if oldSC != nil { + // Teardown old sc. + bw.mu.Lock() + delete(bw.conns, oldA) + delete(bw.connSt, oldSC) + bw.mu.Unlock() + bw.cc.RemoveSubConn(oldSC) + } + continue + } + + var newAddrs []resolver.Address + for _, a := range addrs { + newAddr := resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, // All addresses from balancer are all backends. + ServerName: "", + Metadata: a.Metadata, + } + newAddrs = append(newAddrs, newAddr) + } + if oldSC == nil { + // Create new sc. + sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err) + } else { + bw.mu.Lock() + // For pickfirst, there should be only one SubConn, so the + // address doesn't matter. All states updating (up and down) + // and picking should all happen on that only SubConn. + bw.conns[resolver.Address{}] = sc + bw.connSt[sc] = &scState{ + addr: addrs[0], // Use the first address. + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + } else { + bw.mu.Lock() + bw.connSt[oldSC].addr = addrs[0] + bw.mu.Unlock() + oldSC.UpdateAddresses(newAddrs) + } + } else { + var ( + add []resolver.Address // Addresses need to setup connections. + del []balancer.SubConn // Connections need to tear down. + ) + resAddrs := make(map[resolver.Address]Address) + for _, a := range addrs { + resAddrs[resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, // All addresses from balancer are all backends. + ServerName: "", + Metadata: a.Metadata, + }] = a + } + bw.mu.Lock() + for a := range resAddrs { + if _, ok := bw.conns[a]; !ok { + add = append(add, a) + } + } + for a, c := range bw.conns { + if _, ok := resAddrs[a]; !ok { + del = append(del, c) + delete(bw.conns, a) + // Keep the state of this sc in bw.connSt until its state becomes Shutdown. + } + } + bw.mu.Unlock() + for _, a := range add { + sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) + } else { + bw.mu.Lock() + bw.conns[a] = sc + bw.connSt[sc] = &scState{ + addr: resAddrs[a], + s: connectivity.Idle, + } + bw.mu.Unlock() + sc.Connect() + } + } + for _, c := range del { + bw.cc.RemoveSubConn(c) + } + } + } +} + +func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + bw.mu.Lock() + defer bw.mu.Unlock() + scSt, ok := bw.connSt[sc] + if !ok { + return + } + if s == connectivity.Idle { + sc.Connect() + } + oldS := scSt.s + scSt.s = s + if oldS != connectivity.Ready && s == connectivity.Ready { + scSt.down = bw.balancer.Up(scSt.addr) + } else if oldS == connectivity.Ready && s != connectivity.Ready { + if scSt.down != nil { + scSt.down(errConnClosing) + } + } + sa := bw.csEvltr.RecordTransition(oldS, s) + if bw.state != sa { + bw.state = sa + } + bw.cc.UpdateState(balancer.State{ConnectivityState: bw.state, Picker: bw}) + if s == connectivity.Shutdown { + // Remove state for this sc. + delete(bw.connSt, sc) + } +} + +func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) { + bw.mu.Lock() + defer bw.mu.Unlock() + select { + case <-bw.startCh: + default: + close(bw.startCh) + } + // There should be a resolver inside the balancer. + // All updates here, if any, are ignored. +} + +func (bw *balancerWrapper) Close() { + bw.mu.Lock() + defer bw.mu.Unlock() + select { + case <-bw.startCh: + default: + close(bw.startCh) + } + bw.balancer.Close() +} + +// The picker is the balancerWrapper itself. +// It either blocks or returns error, consistent with v1 balancer Get(). +func (bw *balancerWrapper) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) { + failfast := true // Default failfast is true. + if ss, ok := rpcInfoFromContext(info.Ctx); ok { + failfast = ss.failfast + } + a, p, err := bw.balancer.Get(info.Ctx, BalancerGetOptions{BlockingWait: !failfast}) + if err != nil { + return balancer.PickResult{}, toRPCErr(err) + } + if p != nil { + result.Done = func(balancer.DoneInfo) { p() } + defer func() { + if err != nil { + p() + } + }() + } + + bw.mu.Lock() + defer bw.mu.Unlock() + if bw.pickfirst { + // Get the first sc in conns. + for _, result.SubConn = range bw.conns { + return result, nil + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } + var ok1 bool + result.SubConn, ok1 = bw.conns[resolver.Address{ + Addr: a.Addr, + Type: resolver.Backend, + ServerName: "", + Metadata: a.Metadata, + }] + s, ok2 := bw.connSt[result.SubConn] + if !ok1 || !ok2 { + // This can only happen due to a race where Get() returned an address + // that was subsequently removed by Notify. In this case we should + // retry always. + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } + switch s.s { + case connectivity.Ready, connectivity.Idle: + return result, nil + case connectivity.Shutdown, connectivity.TransientFailure: + // If the returned sc has been shut down or is in transient failure, + // return error, and this RPC will fail or wait for another picker (if + // non-failfast). + return balancer.PickResult{}, balancer.ErrTransientFailure + default: + // For other states (connecting or unknown), the v1 balancer would + // traditionally wait until ready and then issue the RPC. Returning + // ErrNoSubConnAvailable will be a slight improvement in that it will + // allow the balancer to choose another address in case others are + // connected. + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } +} diff --git a/clientconn.go b/clientconn.go index bf362d0a220..14ce9c76aa3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -68,6 +68,8 @@ var ( errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + // errBalancerClosed indicates that the balancer is closed. + errBalancerClosed = errors.New("grpc: balancer is closed") // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default // service config. invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index f3c0233c1ec..1309e894135 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -449,9 +449,9 @@ type stateRecordingBalancer struct { balancer.Balancer } -func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - b.notifier <- s.ConnectivityState - b.Balancer.UpdateSubConnState(sc, s) +func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + b.notifier <- s + b.Balancer.HandleSubConnStateChange(sc, s) } func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { diff --git a/clientconn_test.go b/clientconn_test.go index 10993290d27..02d15abbc4f 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -36,11 +36,21 @@ import ( internalbackoff "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/naming" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/testdata" ) +func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + var state connectivity.State + for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + } + return state, state == wantState +} + func (s) TestDialWithTimeout(t *testing.T) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { @@ -350,6 +360,42 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { } +func (s) TestConnectivityStates(t *testing.T) { + servers, resolver, cleanup := startServers(t, 2, math.MaxUint32) + defer cleanup() + cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(resolver)), WithInsecure()) + if err != nil { + t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ ", err) + } + defer cc.Close() + wantState := connectivity.Ready + if state, ok := assertState(wantState, cc); !ok { + t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) + } + // Send an update to delete the server connection (tearDown addrConn). + update := []*naming.Update{ + { + Op: naming.Delete, + Addr: "localhost:" + servers[0].port, + }, + } + resolver.w.inject(update) + wantState = connectivity.TransientFailure + if state, ok := assertState(wantState, cc); !ok { + t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) + } + update[0] = &naming.Update{ + Op: naming.Add, + Addr: "localhost:" + servers[1].port, + } + resolver.w.inject(update) + wantState = connectivity.Ready + if state, ok := assertState(wantState, cc); !ok { + t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState) + } + +} + func (s) TestWithTimeout(t *testing.T) { conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure()) if err == nil { @@ -545,6 +591,43 @@ func (s) TestDialContextFailFast(t *testing.T) { } } +// blockingBalancer mimics the behavior of balancers whose initialization takes a long time. +// In this test, reading from blockingBalancer.Notify() blocks forever. +type blockingBalancer struct { + ch chan []Address +} + +func newBlockingBalancer() Balancer { + return &blockingBalancer{ch: make(chan []Address)} +} +func (b *blockingBalancer) Start(target string, config BalancerConfig) error { + return nil +} +func (b *blockingBalancer) Up(addr Address) func(error) { + return nil +} +func (b *blockingBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { + return Address{}, nil, nil +} +func (b *blockingBalancer) Notify() <-chan []Address { + return b.ch +} +func (b *blockingBalancer) Close() error { + close(b.ch) + return nil +} + +func (s) TestDialWithBlockingBalancer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + dialDone := make(chan struct{}) + go func() { + DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithBalancer(newBlockingBalancer())) + close(dialDone) + }() + cancel() + <-dialDone +} + // securePerRPCCredentials always requires transport security. type securePerRPCCredentials struct{} @@ -640,6 +723,50 @@ func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) { } } +// emptyBalancer returns an empty set of servers. +type emptyBalancer struct { + ch chan []Address +} + +func newEmptyBalancer() Balancer { + return &emptyBalancer{ch: make(chan []Address, 1)} +} +func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error { + b.ch <- nil + return nil +} +func (b *emptyBalancer) Up(_ Address) func(error) { + return nil +} +func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) { + return Address{}, nil, nil +} +func (b *emptyBalancer) Notify() <-chan []Address { + return b.ch +} +func (b *emptyBalancer) Close() error { + close(b.ch) + return nil +} + +func (s) TestNonblockingDialWithEmptyBalancer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dialDone := make(chan error) + go func() { + dialDone <- func() error { + conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer())) + if err != nil { + return err + } + return conn.Close() + }() + }() + if err := <-dialDone; err != nil { + t.Fatalf("unexpected error dialing connection: %s", err) + } +} + func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) { r, rcleanup := manual.GenerateAndRegisterManualResolver() defer rcleanup() diff --git a/dialoptions.go b/dialoptions.go index 0f41d9b7272..9af3eef7ab3 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -199,6 +199,19 @@ func WithDecompressor(dc Decompressor) DialOption { }) } +// WithBalancer returns a DialOption which sets a load balancer with the v1 API. +// Name resolver will be ignored if this DialOption is specified. +// +// Deprecated: use the new balancer APIs in balancer package and +// WithBalancerName. Will be removed in a future 1.x release. +func WithBalancer(b Balancer) DialOption { + return newFuncDialOption(func(o *dialOptions) { + o.balancerBuilder = &balancerWrapperBuilder{ + b: b, + } + }) +} + // WithBalancerName sets the balancer that the ClientConn will be initialized // with. Balancer registered with balancerName will be used. This function // panics if no balancer was registered by balancerName. diff --git a/internal/balancer/stub/stub.go b/internal/balancer/stub/stub.go deleted file mode 100644 index 449365dfcde..00000000000 --- a/internal/balancer/stub/stub.go +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package stub implements a balancer for testing purposes. -package stub - -import "google.golang.org/grpc/balancer" - -// BalancerFuncs contains all balancer.Balancer functions with a preceding -// *BalancerData parameter for passing additional instance information. Any -// nil functions will never be called. -type BalancerFuncs struct { - // Init is called after ClientConn and BuildOptions are set in - // BalancerData. It may be used to initialize BalancerData.Data. - Init func(*BalancerData) - - UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error - ResolverError func(*BalancerData, error) - UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState) - Close func(*BalancerData) -} - -// BalancerData contains data relevant to a stub balancer. -type BalancerData struct { - // ClientConn is set by the builder. - ClientConn balancer.ClientConn - // BuildOptions is set by the builder. - BuildOptions balancer.BuildOptions - // Data may be used to store arbitrary user data. - Data interface{} -} - -type bal struct { - bf BalancerFuncs - bd *BalancerData -} - -func (b *bal) UpdateClientConnState(c balancer.ClientConnState) error { - if b.bf.UpdateClientConnState != nil { - return b.bf.UpdateClientConnState(b.bd, c) - } - return nil -} -func (b *bal) ResolverError(e error) { - if b.bf.ResolverError != nil { - b.bf.ResolverError(b.bd, e) - } -} -func (b *bal) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { - if b.bf.UpdateSubConnState != nil { - b.bf.UpdateSubConnState(b.bd, sc, scs) - } -} -func (b *bal) Close() { - if b.bf.Close != nil { - b.bf.Close(b.bd) - } -} - -type bb struct { - name string - bf BalancerFuncs -} - -func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - b := &bal{bf: bb.bf, bd: &BalancerData{ClientConn: cc, BuildOptions: opts}} - if b.bf.Init != nil { - b.bf.Init(b.bd) - } - return b -} - -func (bb bb) Name() string { return bb.name } - -// Register registers a stub balancer builder which will call the provided -// functions. The name used should be unique. -func Register(name string, bf BalancerFuncs) { - balancer.Register(bb{name: name, bf: bf}) -} diff --git a/picker_wrapper.go b/picker_wrapper.go index a2e996d7295..00447894f07 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -20,6 +20,7 @@ package grpc import ( "context" + "fmt" "io" "sync" @@ -31,13 +32,33 @@ import ( "google.golang.org/grpc/status" ) +// v2PickerWrapper wraps a balancer.Picker while providing the +// balancer.V2Picker API. It requires a pickerWrapper to generate errors +// including the latest connectionError. To be deleted when balancer.Picker is +// updated to the balancer.V2Picker API. +type v2PickerWrapper struct { + picker balancer.Picker + connErr *connErr +} + +func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + sc, done, err := v.picker.Pick(info.Ctx, info) + if err != nil { + if err == balancer.ErrTransientFailure { + return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError())) + } + return balancer.PickResult{}, err + } + return balancer.PickResult{SubConn: sc, Done: done}, nil +} + // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick // actions and unblock when there's a picker update. type pickerWrapper struct { mu sync.Mutex done bool blockingCh chan struct{} - picker balancer.Picker + picker balancer.V2Picker // The latest connection error. TODO: remove when V1 picker is deprecated; // balancer should be responsible for providing the error. @@ -68,6 +89,11 @@ func newPickerWrapper() *pickerWrapper { // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. func (pw *pickerWrapper) updatePicker(p balancer.Picker) { + pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr}) +} + +// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. +func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) { pw.mu.Lock() if pw.done { pw.mu.Unlock() @@ -154,18 +180,18 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. if err == balancer.ErrNoSubConnAvailable { continue } - if drop, ok := err.(interface{ DropRPC() bool }); ok && drop.DropRPC() { - // End the RPC unconditionally. If not a status error, Convert - // will transform it into one with code Unknown. - return nil, nil, status.Convert(err).Err() + if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() { + if !failfast { + lastPickErr = err + continue + } + return nil, nil, status.Error(codes.Unavailable, err.Error()) } - // For all other errors, wait for ready RPCs should block and other - // RPCs should fail. - if !failfast { - lastPickErr = err - continue + if _, ok := status.FromError(err); ok { + return nil, nil, err } - return nil, nil, status.Error(codes.Unavailable, err.Error()) + // err is some other error. + return nil, nil, status.Error(codes.Unknown, err.Error()) } acw, ok := pickResult.SubConn.(*acBalancerWrapper) diff --git a/picker_wrapper_test.go b/picker_wrapper_test.go index 5f786b28580..50916a2dfde 100644 --- a/picker_wrapper_test.go +++ b/picker_wrapper_test.go @@ -55,14 +55,14 @@ type testingPicker struct { maxCalled int64 } -func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *testingPicker) Pick(ctx context.Context, info balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) { if atomic.AddInt64(&p.maxCalled, -1) < 0 { - return balancer.PickResult{}, fmt.Errorf("pick called to many times (> goroutineCount)") + return nil, nil, fmt.Errorf("pick called to many times (> goroutineCount)") } if p.err != nil { - return balancer.PickResult{}, p.err + return nil, nil, p.err } - return balancer.PickResult{SubConn: p.sc}, nil + return p.sc, nil, nil } func (s) TestBlockingPickTimeout(t *testing.T) { diff --git a/pickfirst.go b/pickfirst.go index 7ba90ae5ab1..c43dac9ad84 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" ) @@ -51,6 +52,20 @@ type pickfirstBalancer struct { sc balancer.SubConn } +var _ balancer.V2Balancer = &pickfirstBalancer{} // Assert we implement v2 + +func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + if err != nil { + b.ResolverError(err) + return + } + b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) // Ignore error +} + +func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s}) +} + func (b *pickfirstBalancer) ResolverError(err error) { switch b.state { case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: @@ -114,9 +129,15 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S case connectivity.Connecting: b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) case connectivity.TransientFailure: + err := balancer.ErrTransientFailure + // TODO: this can be unconditional after the V1 API is removed, as + // SubConnState will always contain a connection error. + if s.ConnectionError != nil { + err = balancer.TransientFailureError(s.ConnectionError) + } b.cc.UpdateState(balancer.State{ ConnectivityState: s.ConnectivityState, - Picker: &picker{err: s.ConnectionError}, + Picker: &picker{err: err}, }) } } diff --git a/pickfirst_test.go b/pickfirst_test.go index a69cec1c51d..475eedfdf46 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -43,7 +43,7 @@ func (s) TestOneBackendPickfirst(t *testing.T) { defer rcleanup() numServers := 1 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -76,7 +76,7 @@ func (s) TestBackendsPickfirst(t *testing.T) { defer rcleanup() numServers := 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -109,7 +109,7 @@ func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) { defer rcleanup() numServers := 1 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -145,7 +145,7 @@ func (s) TestCloseWithPendingRPCPickfirst(t *testing.T) { defer rcleanup() numServers := 1 - _, scleanup := startServers(t, numServers, math.MaxInt32) + _, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -181,7 +181,7 @@ func (s) TestOneServerDownPickfirst(t *testing.T) { defer rcleanup() numServers := 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -222,7 +222,7 @@ func (s) TestAllServersDownPickfirst(t *testing.T) { defer rcleanup() numServers := 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) @@ -265,7 +265,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) { defer rcleanup() numServers := 3 - servers, scleanup := startServers(t, numServers, math.MaxInt32) + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) diff --git a/resolver_conn_wrapper_test.go b/resolver_conn_wrapper_test.go index 1dbf5371ab2..bf9c3c293ff 100644 --- a/resolver_conn_wrapper_test.go +++ b/resolver_conn_wrapper_test.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" - "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" @@ -185,12 +184,12 @@ const happyBalancerName = "happy balancer" func init() { // Register a balancer that never returns an error from // UpdateClientConnState, and doesn't do anything else either. - bf := stub.BalancerFuncs{ - UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error { + fb := &funcBalancer{ + updateClientConnState: func(s balancer.ClientConnState) error { return nil }, } - stub.Register(happyBalancerName, bf) + balancer.Register(&funcBalancerBuilder{name: happyBalancerName, instance: fb}) } // TestResolverErrorPolling injects resolver errors and verifies ResolveNow is diff --git a/test/balancer_test.go b/test/balancer_test.go index 171876b880c..afa9ec1b7ea 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -20,17 +20,12 @@ package test import ( "context" - "errors" - "fmt" - "net" "reflect" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" @@ -39,7 +34,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" "google.golang.org/grpc/testdata" ) @@ -49,7 +43,7 @@ const testBalancerName = "testbalancer" // testBalancer creates one subconn with the first address from resolved // addresses. // -// It's used to test options for NewSubConn are applied correctly. +// It's used to test options for NewSubConn are applies correctly. type testBalancer struct { cc balancer.ClientConn sc balancer.SubConn @@ -68,43 +62,37 @@ func (*testBalancer) Name() string { return testBalancerName } -func (*testBalancer) ResolverError(err error) { - panic("not implemented") -} - -func (b *testBalancer) UpdateClientConnState(state balancer.ClientConnState) error { +func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { // Only create a subconn at the first time. - if b.sc == nil { - var err error - b.sc, err = b.cc.NewSubConn(state.ResolverState.Addresses, b.newSubConnOptions) + if err == nil && b.sc == nil { + b.sc, err = b.cc.NewSubConn(addrs, b.newSubConnOptions) if err != nil { grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err) - return nil + return } b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &picker{sc: b.sc, bal: b}}) b.sc.Connect() } - return nil } -func (b *testBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { - grpclog.Infof("testBalancer: UpdateSubConnState: %p, %v", sc, s) +func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + grpclog.Infof("testBalancer: HandleSubConnStateChange: %p, %v", sc, s) if b.sc != sc { grpclog.Infof("testBalancer: ignored state change because sc is not recognized") return } - if s.ConnectivityState == connectivity.Shutdown { + if s == connectivity.Shutdown { b.sc = nil return } - switch s.ConnectivityState { + switch s { case connectivity.Ready, connectivity.Idle: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{sc: sc, bal: b}}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{sc: sc, bal: b}}) case connectivity.Connecting: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}}) case connectivity.TransientFailure: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}}) + b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}}) } } @@ -291,10 +279,6 @@ func newTestBalancerKeepAddresses() *testBalancerKeepAddresses { } } -func (testBalancerKeepAddresses) ResolverError(err error) { - panic("not implemented") -} - func (b *testBalancerKeepAddresses) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { return b } @@ -303,12 +287,11 @@ func (*testBalancerKeepAddresses) Name() string { return testBalancerKeepAddressesName } -func (b *testBalancerKeepAddresses) UpdateClientConnState(state balancer.ClientConnState) error { - b.addrsChan <- state.ResolverState.Addresses - return nil +func (b *testBalancerKeepAddresses) HandleResolvedAddrs(addrs []resolver.Address, err error) { + b.addrsChan <- addrs } -func (testBalancerKeepAddresses) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { +func (testBalancerKeepAddresses) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { panic("not used") } @@ -363,220 +346,3 @@ func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) { t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses) } } - -// TestServersSwap creates two servers and verifies the client switches between -// them when the name resolver reports the first and then the second. -func (s) TestServersSwap(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Initialize servers - reg := func(username string) (addr string, cleanup func()) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - s := grpc.NewServer() - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Username: username}, nil - }, - } - testpb.RegisterTestServiceServer(s, ts) - go s.Serve(lis) - return lis.Addr().String(), s.Stop - } - const one = "1" - addr1, cleanup := reg(one) - defer cleanup() - const two = "2" - addr2, cleanup := reg(two) - defer cleanup() - - // Initialize client - r, cleanup := manual.GenerateAndRegisterManualResolver() - defer cleanup() - r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: addr1}}}) - cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer cc.Close() - client := testpb.NewTestServiceClient(cc) - - // Confirm we are connected to the first server - if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Update resolver to report only the second server - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr2}}}) - - // Loop until new RPCs talk to server two. - for i := 0; i < 2000; i++ { - if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } else if res.Username == two { - break // pass - } - time.Sleep(5 * time.Millisecond) - } -} - -// TestEmptyAddrs verifies client behavior when a working connection is -// removed. In pick first, it will continue using the old connection but in -// round robin it will not. -func (s) TestEmptyAddrs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Initialize server - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - s := grpc.NewServer() - defer s.Stop() - const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Username: one}, nil - }, - } - testpb.RegisterTestServiceServer(s, ts) - go s.Serve(lis) - - // Initialize pickfirst client - pfr, cleanup := manual.GenerateAndRegisterManualResolver() - rn := make(chan struct{}) - pfr.ResolveNowCallback = func(resolver.ResolveNowOptions) { - rn <- struct{}{} - } - defer cleanup() - pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithInsecure()) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer pfcc.Close() - pfclient := testpb.NewTestServiceClient(pfcc) - - // Confirm we are connected to the server - if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - pfr.UpdateState(resolver.State{}) - // Wait for a ResolveNow call on the pick first client's resolver. - select { - case <-rn: - case <-ctx.Done(): - t.Fatalf("ResolveNow() never invoked after providing empty addresses") - } - - // Initialize roundrobin client - rrr, cleanup := manual.GenerateAndRegisterManualResolver() - defer cleanup() - rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithInsecure(), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer rrcc.Close() - rrclient := testpb.NewTestServiceClient(rrcc) - - // Confirm we are connected to the server - if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - rrr.UpdateState(resolver.State{}) - - // Confirm RPCs eventually fail on round robin. - for { - if _, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - break - } - time.Sleep(5 * time.Millisecond) - } - if ctx.Err() != nil { - t.Fatalf("round robin client did not fail after 5 seconds") - } - - // Confirm several new RPCs succeed on pick first. Because rr failed - // before this, but we initialized and updated pf first, we should not need - // to check this success case for as long. - for i := 0; i < 10; i++ { - if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } - time.Sleep(5 * time.Millisecond) - } -} - -func (s) TestWaitForReady(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Initialize server - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - s := grpc.NewServer() - defer s.Stop() - const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Username: one}, nil - }, - } - testpb.RegisterTestServiceServer(s, ts) - go s.Serve(lis) - - // Initialize client - r, cleanup := manual.GenerateAndRegisterManualResolver() - defer cleanup() - - cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure()) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer cc.Close() - client := testpb.NewTestServiceClient(cc) - - // Report an error so non-WFR RPCs will give up early. - r.CC.ReportError(errors.New("fake resolver error")) - - // Ensure the client is not connected to anything and fails non-WFR RPCs. - if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Unavailable { - t.Fatalf("UnaryCall(_) = %v, %v; want _, Code()=%v", res, err, codes.Unavailable) - } - - errChan := make(chan error, 1) - go func() { - if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.WaitForReady(true)); err != nil || res.Username != one { - errChan <- fmt.Errorf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - close(errChan) - }() - - select { - case err := <-errChan: - t.Errorf("unexpected receive from errChan before addresses provided") - t.Fatal(err.Error()) - case <-time.After(5 * time.Millisecond): - } - - // Resolve the server. The WFR RPC should unblock and use it. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - if err := <-errChan; err != nil { - t.Fatal(err.Error()) - } -} diff --git a/test/creds_test.go b/test/creds_test.go index aff2a1ada23..275165c8cd7 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -66,7 +66,7 @@ func (c *testCredsBundle) NewWithMode(mode string) (credentials.Bundle, error) { } func (s) TestCredsBundleBoth(t *testing.T) { - te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"}) + te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"}) te.tapHandle = authHandle te.customDialOptions = []grpc.DialOption{ grpc.WithCredentialsBundle(&testCredsBundle{t: t}), @@ -89,7 +89,7 @@ func (s) TestCredsBundleBoth(t *testing.T) { } func (s) TestCredsBundleTransportCredentials(t *testing.T) { - te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"}) + te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"}) te.customDialOptions = []grpc.DialOption{ grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundleTLSOnly}), } @@ -111,7 +111,7 @@ func (s) TestCredsBundleTransportCredentials(t *testing.T) { } func (s) TestCredsBundlePerRPCCredentials(t *testing.T) { - te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"}) + te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"}) te.tapHandle = authHandle te.customDialOptions = []grpc.DialOption{ grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundlePerRPCOnly}), diff --git a/test/end2end_test.go b/test/end2end_test.go index f8e5d8ad131..f9491739bb2 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -50,6 +50,7 @@ import ( "golang.org/x/net/http2/hpack" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -420,7 +421,7 @@ type env struct { network string // The type of network such as tcp, unix, etc. security string // The security protocol such as TLS, SSH, etc. httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS - balancer string // One of "round_robin", "pick_first", or "". + balancer string // One of "round_robin", "pick_first", "v1", or "". customDialer func(string, string, time.Duration) (net.Conn, error) } @@ -439,8 +440,8 @@ func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { } var ( - tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp"} - tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"} + tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"} + tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"} tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"} tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"} handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"} @@ -837,8 +838,11 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) } else { scheme = te.resolverScheme + ":///" } - if te.e.balancer != "" { - opts = append(opts, grpc.WithBalancerName(te.e.balancer)) + switch te.e.balancer { + case "v1": + opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) + case "round_robin": + opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) } if te.clientInitialWindowSize > 0 { opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) @@ -4762,7 +4766,7 @@ func (c *clientTimeoutCreds) OverrideServerName(s string) error { } func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { - te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds"}) + te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"}) te.userAgent = testAppUA te.startServer(&testServer{security: te.e.security}) defer te.tearDown() diff --git a/vet.sh b/vet.sh index 46e626c97e2..4691363086d 100755 --- a/vet.sh +++ b/vet.sh @@ -121,11 +121,14 @@ staticcheck -go 1.9 -checks 'inherit,-ST1015,-SA6002' ./... > "${SC_OUT}" || tru # Error if anything other than deprecation warnings are printed. (! grep -v "is deprecated:.*SA1019" "${SC_OUT}") # Only ignore the following deprecated types/fields/functions. -(! grep -Fv '.HeaderMap +(! grep -Fv '.HandleResolvedAddrs +.HandleSubConnStateChange +.HeaderMap .NewAddress .NewServiceConfig .Metadata is deprecated: use Attributes .Type is deprecated: use Attributes +.UpdateBalancerState balancer.Picker grpc.CallCustomCodec grpc.Code diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 909e89afd9c..0d49b3813f6 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/resolver" @@ -45,7 +46,7 @@ var ( // newEDSBalancer is a helper function to build a new edsBalancer and will be // overridden in unittests. - newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer { builder := balancer.Get(edsName) if builder == nil { grpclog.Errorf("xds: no balancer builder with name %v", edsName) @@ -54,7 +55,7 @@ var ( // We directly pass the parent clientConn to the // underlying edsBalancer because the cdsBalancer does // not deal with subConns. - return builder.Build(cc, opts) + return builder.Build(cc, opts).(balancer.V2Balancer) } ) @@ -149,7 +150,7 @@ type cdsBalancer struct { updateCh *buffer.Unbounded client xdsClientInterface cancelWatch func() - edsLB balancer.Balancer + edsLB balancer.V2Balancer clusterToWatch string // The only thing protected by this mutex is the closed boolean. This is @@ -339,3 +340,11 @@ func (b *cdsBalancer) isClosed() bool { b.mu.Unlock() return closed } + +func (b *cdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") +} + +func (b *cdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + grpclog.Error("UpdateClientConnState should be called instead of HandleResolvedAddrs") +} diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 7cab7e1cfe7..6fc0bcaa259 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -209,11 +209,11 @@ func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.Clie func setup() (*cdsBalancer, *testEDSBalancer, func()) { builder := cdsBB{} tcc := &testClientConn{} - cdsB := builder.Build(tcc, balancer.BuildOptions{}) + cdsB := builder.Build(tcc, balancer.BuildOptions{}).(balancer.V2Balancer) edsB := newTestEDSBalancer() oldEDSBalancerBuilder := newEDSBalancer - newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer { return edsB } diff --git a/xds/internal/balancer/edsbalancer/balancergroup.go b/xds/internal/balancer/edsbalancer/balancergroup.go index 0e9205a0f8d..fc10f3d3412 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup.go +++ b/xds/internal/balancer/edsbalancer/balancergroup.go @@ -66,6 +66,10 @@ type subBalancerWithConfig struct { balancer balancer.Balancer } +func (sbc *subBalancerWithConfig) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { + grpclog.Fatalln("not implemented") +} + // UpdateState overrides balancer.ClientConn, to keep state and picker. func (sbc *subBalancerWithConfig) UpdateState(state balancer.State) { sbc.mu.Lock() @@ -91,7 +95,11 @@ func (sbc *subBalancerWithConfig) updateBalancerStateWithCachedPicker() { func (sbc *subBalancerWithConfig) startBalancer() { b := sbc.builder.Build(sbc, balancer.BuildOptions{}) sbc.balancer = b - b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}}) + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}}) + } else { + b.HandleResolvedAddrs(sbc.addrs, nil) + } } func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { @@ -103,7 +111,11 @@ func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn, // happen. return } - b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state}) + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state}) + } else { + b.HandleSubConnStateChange(sc, state) + } } func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) { @@ -120,7 +132,11 @@ func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) { // it's the lower priority, but it can still get address updates. return } - b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) + if ub, ok := b.(balancer.V2Balancer); ok { + ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) + } else { + b.HandleResolvedAddrs(addrs, nil) + } } func (sbc *subBalancerWithConfig) stopBalancer() { @@ -130,7 +146,7 @@ func (sbc *subBalancerWithConfig) stopBalancer() { type pickerState struct { weight uint32 - picker balancer.Picker + picker balancer.V2Picker state connectivity.State } @@ -541,7 +557,7 @@ func buildPickerAndState(m map[internal.Locality]*pickerState) balancer.State { aggregatedState = connectivity.TransientFailure } if aggregatedState == connectivity.TransientFailure { - return balancer.State{aggregatedState, base.NewErrPicker(balancer.ErrTransientFailure)} + return balancer.State{aggregatedState, base.NewErrPickerV2(balancer.ErrTransientFailure)} } return balancer.State{aggregatedState, newPickerGroup(readyPickerWithWeights)} } @@ -577,7 +593,7 @@ func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) if pg.length <= 0 { return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } - p := pg.w.Next().(balancer.Picker) + p := pg.w.Next().(balancer.V2Picker) return p.Pick(info) } @@ -587,13 +603,13 @@ const ( ) type loadReportPicker struct { - p balancer.Picker + p balancer.V2Picker id internal.Locality loadStore lrs.Store } -func newLoadReportPicker(p balancer.Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker { +func newLoadReportPicker(p balancer.V2Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker { return &loadReportPicker{ p: p, id: id, diff --git a/xds/internal/balancer/edsbalancer/balancergroup_test.go b/xds/internal/balancer/edsbalancer/balancergroup_test.go index 1075878d1f9..d64296b8f0c 100644 --- a/xds/internal/balancer/edsbalancer/balancergroup_test.go +++ b/xds/internal/balancer/edsbalancer/balancergroup_test.go @@ -48,7 +48,7 @@ func init() { defaultSubBalancerCloseTimeout = time.Millisecond } -func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { +func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn { return func() balancer.SubConn { scst, _ := p.Pick(balancer.PickInfo{}) return scst.SubConn diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 151f49fca0f..2199e038190 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -29,6 +29,8 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/lrs" xdsclient "google.golang.org/grpc/xds/internal/client" @@ -96,6 +98,8 @@ type edsBalancerImplInterface interface { Close() } +var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer + // edsBalancer manages xdsClient and the actual EDS balancer implementation that // does load balancing. // @@ -192,6 +196,14 @@ type subConnStateUpdate struct { state balancer.SubConnState } +func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange") +} + +func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs") +} + func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { update := &subConnStateUpdate{ sc: sc, diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 68c754f5bc3..cbae7934940 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -392,6 +392,9 @@ type edsBalancerWrapperCC struct { func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { return ebwcc.parent.newSubConn(ebwcc.priority, addrs, opts) } +func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) { + grpclog.Fatalln("not implemented") +} func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) { ebwcc.parent.updateState(ebwcc.priority, state) } @@ -418,11 +421,11 @@ func (edsImpl *edsBalancerImpl) Close() { type dropPicker struct { drops []*dropper - p balancer.Picker + p balancer.V2Picker loadStore lrs.Store } -func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { +func newDropPicker(p balancer.V2Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { return &dropPicker{ drops: drops, p: p, @@ -446,7 +449,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if d.loadStore != nil { d.loadStore.CallDropped(category) } - return balancer.PickResult{}, balancer.DropRPCError(status.Errorf(codes.Unavailable, "RPC is dropped")) + return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped") } // TODO: (eds) don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. diff --git a/xds/internal/balancer/edsbalancer/eds_impl_priority.go b/xds/internal/balancer/edsbalancer/eds_impl_priority.go index eb8cb6aea4e..bf3593db4d1 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_priority.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_priority.go @@ -45,7 +45,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() { // Everything was removed by EDS. if !edsImpl.priorityLowest.isSet() { edsImpl.priorityInUse = newPriorityTypeUnset() - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(balancer.ErrTransientFailure)}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)}) return } @@ -69,7 +69,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() { // We don't have an old state to send to parent, but we also don't // want parent to keep using picker from old_priorityInUse. Send an // update to trigger block picks until a new picker is ready. - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)}) } return } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 44a8e312e7d..3d8600054b6 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" xdsclient "google.golang.org/grpc/xds/internal/client" ) @@ -384,20 +385,15 @@ type testConstBalancer struct { cc balancer.ClientConn } -func (tb *testConstBalancer) ResolverError(error) { - panic("not implemented") -} - -func (tb *testConstBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { +func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &testConstPicker{err: errTestConstPicker}}) } -func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - a := s.ResolverState.Addresses - if len(a) > 0 { - tb.cc.NewSubConn(a, balancer.NewSubConnOptions{}) +func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) { + if len(a) == 0 { + return } - return nil + tb.cc.NewSubConn(a, balancer.NewSubConnOptions{}) } func (*testConstBalancer) Close() { diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index d336d17e50c..138630d7893 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -274,15 +274,11 @@ type fakeBalancer struct { cc balancer.ClientConn } -func (b *fakeBalancer) ResolverError(error) { +func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { panic("implement me") } -func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error { - panic("implement me") -} - -func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { +func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { panic("implement me") } diff --git a/xds/internal/balancer/edsbalancer/test_util_test.go b/xds/internal/balancer/edsbalancer/test_util_test.go index fba7b3c127e..a2b06680250 100644 --- a/xds/internal/balancer/edsbalancer/test_util_test.go +++ b/xds/internal/balancer/edsbalancer/test_util_test.go @@ -65,7 +65,7 @@ type testClientConn struct { newSubConnCh chan balancer.SubConn // The last 10 subconn created. removeSubConnCh chan balancer.SubConn // The last 10 subconn removed. - newPickerCh chan balancer.Picker // The last picker updated. + newPickerCh chan balancer.V2Picker // The last picker updated. newStateCh chan connectivity.State // The last state. subConnIdx int @@ -79,7 +79,7 @@ func newTestClientConn(t *testing.T) *testClientConn { newSubConnCh: make(chan balancer.SubConn, 10), removeSubConnCh: make(chan balancer.SubConn, 10), - newPickerCh: make(chan balancer.Picker, 1), + newPickerCh: make(chan balancer.V2Picker, 1), newStateCh: make(chan connectivity.State, 1), } } @@ -110,6 +110,10 @@ func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) { } } +func (tcc *testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + tcc.t.Fatal("not implemented") +} + func (tcc *testClientConn) UpdateState(bs balancer.State) { tcc.t.Logf("testClientConn: UpdateState(%v)", bs) select {