From 236d7a063c7a8c2ad2f87990ed5fdeed6c3b8dbf Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 23 Apr 2020 12:35:05 -0700 Subject: [PATCH] Updates after rebase and review comments --- balancer/balancer.go | 23 +- balancer/grpclb/grpclb_remote_balancer.go | 2 +- balancer/rls/internal/cache/cache.go | 4 +- balancer/rls/internal/picker.go | 5 +- clientconn.go | 21 +- dialoptions.go | 3 +- examples/go.sum | 2 + internal/status/status.go | 24 +- naming/dns_resolver.go | 293 ------- naming/dns_resolver_test.go | 341 -------- naming/naming.go | 68 -- picker_wrapper.go | 26 +- pickfirst.go | 2 +- service_config.go | 2 +- status/status_test.go | 4 +- test/channelz_test.go | 3 +- test/creds_test.go | 4 +- test/end2end_test.go | 205 +---- vet.sh | 9 +- .../balancer/balancergroup/balancergroup.go | 3 +- .../balancergroup/balancergroup_test.go | 2 +- .../edsbalancer/balancergroup_test.go | 796 ------------------ .../balancer/edsbalancer/eds_impl_test.go | 53 +- xds/internal/balancer/edsbalancer/eds_test.go | 2 +- .../balancer/edsbalancer/test_util_test.go | 346 -------- xds/internal/testutils/balancer.go | 19 +- 26 files changed, 79 insertions(+), 2183 deletions(-) delete mode 100644 naming/dns_resolver.go delete mode 100644 naming/dns_resolver_test.go delete mode 100644 naming/naming.go delete mode 100644 xds/internal/balancer/edsbalancer/balancergroup_test.go delete mode 100644 xds/internal/balancer/edsbalancer/test_util_test.go diff --git a/balancer/balancer.go b/balancer/balancer.go index 11a6bab2864..dc75a957183 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -27,9 +27,11 @@ import ( "net" "strings" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal" + istatus "google.golang.org/grpc/internal/status" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -149,7 +151,7 @@ type ClientConn interface { // changed. // // gRPC will update the connectivity state of the ClientConn, and will call - // pick on the new picker to pick new SubConns. + // Pick on the new Picker to pick new SubConns. UpdateState(State) // ResolveNow is called by balancer to notify gRPC to do a name resolving. @@ -250,20 +252,23 @@ type PickResult struct { } type dropRPCError struct { - error - status *status.Status + *istatus.ErrorT } -func (e *dropRPCError) DropRPC() bool { return true } - -func (e *dropRPCError) GRPCStatus() *status.Status { return e.status } +func (e dropRPCError) DropRPC() bool { return true } // DropRPCError wraps err in an error implementing DropRPC() bool, returning // true. If err is not a status error, it will be converted to one with code -// Unknown and the message containing the err.Error() result. +// Unknown and the message containing the err.Error() result. DropRPCError +// should not be called with a nil error. func DropRPCError(err error) error { - st := status.Convert(err) - return &dropRPCError{error: st.Err(), status: st} + if err == nil { + return dropRPCError{ErrorT: status.Error(codes.Unknown, "nil error passed to DropRPCError").(*istatus.ErrorT)} + } + if se, ok := err.(*istatus.ErrorT); ok { + return dropRPCError{ErrorT: se} + } + return dropRPCError{ErrorT: status.Convert(err).Err().(*istatus.ErrorT)} } // TransientFailureError returns e. It exists for backward compatibility and diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index c6d555e4d8c..302d71316d5 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -192,7 +192,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback lb.cc.RemoveSubConn(sc) delete(lb.subConns, a) // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. - // The entry will be deleted in HandleSubConnStateChange. + // The entry will be deleted in UpdateSubConnState. } } diff --git a/balancer/rls/internal/cache/cache.go b/balancer/rls/internal/cache/cache.go index c945d272ab1..dd03695e0e9 100644 --- a/balancer/rls/internal/cache/cache.go +++ b/balancer/rls/internal/cache/cache.go @@ -85,10 +85,10 @@ type Entry struct { // X-Google-RLS-Data header for matching RPCs. HeaderData string // ChildPicker is a very thin wrapper around the child policy wrapper. - // The type is declared as a V2Picker interface since the users of + // The type is declared as a Picker interface since the users of // the cache only care about the picker provided by the child policy, and // this makes it easy for testing. - ChildPicker balancer.V2Picker + ChildPicker balancer.Picker // size stores the size of this cache entry. Uses only a subset of the // fields. See `entrySize` for this is computed. diff --git a/balancer/rls/internal/picker.go b/balancer/rls/internal/picker.go index e823cf581b4..698185b1595 100644 --- a/balancer/rls/internal/picker.go +++ b/balancer/rls/internal/picker.go @@ -31,10 +31,7 @@ import ( "google.golang.org/grpc/metadata" ) -var errRLSThrottled = balancer.TransientFailureError(errors.New("RLS call throttled at client side")) - -// Compile time assert to ensure we implement V2Picker. -var _ balancer.V2Picker = (*rlsPicker)(nil) +var errRLSThrottled = errors.New("RLS call throttled at client side") // RLS rlsPicker selects the subConn to be used for a particular RPC. It does // not manage subConns directly and usually deletegates to pickers provided by diff --git a/clientconn.go b/clientconn.go index 71cae0ffe6a..ef327e8af4f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -316,7 +316,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if s == connectivity.Ready { break } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { - if err = cc.blockingpicker.connectionError(); err != nil { + if err = cc.connectionError(); err != nil { terr, ok := err.(interface { Temporary() bool }) @@ -327,7 +327,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } if !cc.WaitForStateChange(ctx, s) { // ctx got timeout or canceled. - if err = cc.blockingpicker.connectionError(); err != nil && cc.dopts.returnLastError { + if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { return nil, err } return nil, ctx.Err() @@ -498,6 +498,9 @@ type ClientConn struct { channelzID int64 // channelz unique identification number czData *channelzData + + lceMu sync.Mutex // protects lastConnectionError + lastConnectionError error } // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or @@ -1207,7 +1210,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T if firstConnErr == nil { firstConnErr = err } - ac.cc.blockingpicker.updateConnectionError(err) + ac.cc.updateConnectionError(err) } // Couldn't connect to any address. @@ -1533,3 +1536,15 @@ func (cc *ClientConn) getResolver(scheme string) resolver.Builder { } return resolver.Get(scheme) } + +func (cc *ClientConn) updateConnectionError(err error) { + cc.lceMu.Lock() + cc.lastConnectionError = err + cc.lceMu.Unlock() +} + +func (cc *ClientConn) connectionError() error { + cc.lceMu.Lock() + defer cc.lceMu.Unlock() + return cc.lastConnectionError +} diff --git a/dialoptions.go b/dialoptions.go index 5fa6dec41f3..b5c810927b1 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -57,8 +57,7 @@ type dialOptions struct { authority string copts transport.ConnectOptions callOptions []CallOption - // This is used by v1 balancer dial option WithBalancer to support v1 - // balancer, and also by WithBalancerName dial option. + // This is used by WithBalancerName dial option. balancerBuilder balancer.Builder channelzParentID int64 disableServiceConfig bool diff --git a/examples/go.sum b/examples/go.sum index 0d649b1f5ae..7aa49f55984 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -63,6 +63,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.30.0-dev.1 h1:UPWdABFs9zu2kdq7GrCUcfnVgCT65hSpvHmy0RiKn0M= +google.golang.org/grpc v1.30.0-dev.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/internal/status/status.go b/internal/status/status.go index 681260692e3..e5f4af1b090 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -97,7 +97,7 @@ func (s *Status) Err() error { if s.Code() == codes.OK { return nil } - return (*Error)(s.Proto()) + return (*ErrorT)(s.Proto()) } // WithDetails returns a new status with the provided details messages appended to the status. @@ -136,26 +136,26 @@ func (s *Status) Details() []interface{} { return details } -// Error is an alias of a status proto. It implements error and Status, -// and a nil Error should never be returned by this package. -type Error spb.Status +// ErrorT is an alias of a status proto. It implements error and Status, +// and a nil *ErrorT should never be returned by this package. +type ErrorT spb.Status -func (se *Error) Error() string { - p := (*spb.Status)(se) +func (e *ErrorT) Error() string { + p := (*spb.Status)(e) return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(p.GetCode()), p.GetMessage()) } // GRPCStatus returns the Status represented by se. -func (se *Error) GRPCStatus() *Status { - return FromProto((*spb.Status)(se)) +func (e *ErrorT) GRPCStatus() *Status { + return FromProto((*spb.Status)(e)) } // Is implements future error.Is functionality. -// A Error is equivalent if the code and message are identical. -func (se *Error) Is(target error) bool { - tse, ok := target.(*Error) +// A ErrorT is equivalent if the code and message are identical. +func (e *ErrorT) Is(target error) bool { + tse, ok := target.(*ErrorT) if !ok { return false } - return proto.Equal((*spb.Status)(se), (*spb.Status)(tse)) + return proto.Equal((*spb.Status)(e), (*spb.Status)(tse)) } diff --git a/naming/dns_resolver.go b/naming/dns_resolver.go deleted file mode 100644 index c9f79dc5336..00000000000 --- a/naming/dns_resolver.go +++ /dev/null @@ -1,293 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package naming - -import ( - "context" - "errors" - "fmt" - "net" - "strconv" - "time" - - "google.golang.org/grpc/grpclog" -) - -const ( - defaultPort = "443" - defaultFreq = time.Minute * 30 -) - -var ( - errMissingAddr = errors.New("missing address") - errWatcherClose = errors.New("watcher has been closed") - - lookupHost = net.DefaultResolver.LookupHost - lookupSRV = net.DefaultResolver.LookupSRV -) - -// NewDNSResolverWithFreq creates a DNS Resolver that can resolve DNS names, and -// create watchers that poll the DNS server using the frequency set by freq. -func NewDNSResolverWithFreq(freq time.Duration) (Resolver, error) { - return &dnsResolver{freq: freq}, nil -} - -// NewDNSResolver creates a DNS Resolver that can resolve DNS names, and create -// watchers that poll the DNS server using the default frequency defined by defaultFreq. -func NewDNSResolver() (Resolver, error) { - return NewDNSResolverWithFreq(defaultFreq) -} - -// dnsResolver handles name resolution for names following the DNS scheme -type dnsResolver struct { - // frequency of polling the DNS server that the watchers created by this resolver will use. - freq time.Duration -} - -// formatIP returns ok = false if addr is not a valid textual representation of an IP address. -// If addr is an IPv4 address, return the addr and ok = true. -// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true. -func formatIP(addr string) (addrIP string, ok bool) { - ip := net.ParseIP(addr) - if ip == nil { - return "", false - } - if ip.To4() != nil { - return addr, true - } - return "[" + addr + "]", true -} - -// parseTarget takes the user input target string, returns formatted host and port info. -// If target doesn't specify a port, set the port to be the defaultPort. -// If target is in IPv6 format and host-name is enclosed in square brackets, brackets -// are stripped when setting the host. -// examples: -// target: "www.google.com" returns host: "www.google.com", port: "443" -// target: "ipv4-host:80" returns host: "ipv4-host", port: "80" -// target: "[ipv6-host]" returns host: "ipv6-host", port: "443" -// target: ":80" returns host: "localhost", port: "80" -// target: ":" returns host: "localhost", port: "443" -func parseTarget(target string) (host, port string, err error) { - if target == "" { - return "", "", errMissingAddr - } - - if ip := net.ParseIP(target); ip != nil { - // target is an IPv4 or IPv6(without brackets) address - return target, defaultPort, nil - } - if host, port, err := net.SplitHostPort(target); err == nil { - // target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port - if host == "" { - // Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed. - host = "localhost" - } - if port == "" { - // If the port field is empty(target ends with colon), e.g. "[::1]:", defaultPort is used. - port = defaultPort - } - return host, port, nil - } - if host, port, err := net.SplitHostPort(target + ":" + defaultPort); err == nil { - // target doesn't have port - return host, port, nil - } - return "", "", fmt.Errorf("invalid target address %v", target) -} - -// Resolve creates a watcher that watches the name resolution of the target. -func (r *dnsResolver) Resolve(target string) (Watcher, error) { - host, port, err := parseTarget(target) - if err != nil { - return nil, err - } - - if net.ParseIP(host) != nil { - ipWatcher := &ipWatcher{ - updateChan: make(chan *Update, 1), - } - host, _ = formatIP(host) - ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port} - return ipWatcher, nil - } - - ctx, cancel := context.WithCancel(context.Background()) - return &dnsWatcher{ - r: r, - host: host, - port: port, - ctx: ctx, - cancel: cancel, - t: time.NewTimer(0), - }, nil -} - -// dnsWatcher watches for the name resolution update for a specific target -type dnsWatcher struct { - r *dnsResolver - host string - port string - // The latest resolved address set - curAddrs map[string]*Update - ctx context.Context - cancel context.CancelFunc - t *time.Timer -} - -// ipWatcher watches for the name resolution update for an IP address. -type ipWatcher struct { - updateChan chan *Update -} - -// Next returns the address resolution Update for the target. For IP address, -// the resolution is itself, thus polling name server is unnecessary. Therefore, -// Next() will return an Update the first time it is called, and will be blocked -// for all following calls as no Update exists until watcher is closed. -func (i *ipWatcher) Next() ([]*Update, error) { - u, ok := <-i.updateChan - if !ok { - return nil, errWatcherClose - } - return []*Update{u}, nil -} - -// Close closes the ipWatcher. -func (i *ipWatcher) Close() { - close(i.updateChan) -} - -// AddressType indicates the address type returned by name resolution. -type AddressType uint8 - -const ( - // Backend indicates the server is a backend server. - Backend AddressType = iota - // GRPCLB indicates the server is a grpclb load balancer. - GRPCLB -) - -// AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The -// name resolver used by the grpclb balancer is required to provide this type of metadata in -// its address updates. -type AddrMetadataGRPCLB struct { - // AddrType is the type of server (grpc load balancer or backend). - AddrType AddressType - // ServerName is the name of the grpc load balancer. Used for authentication. - ServerName string -} - -// compileUpdate compares the old resolved addresses and newly resolved addresses, -// and generates an update list -func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update { - var res []*Update - for a, u := range w.curAddrs { - if _, ok := newAddrs[a]; !ok { - u.Op = Delete - res = append(res, u) - } - } - for a, u := range newAddrs { - if _, ok := w.curAddrs[a]; !ok { - res = append(res, u) - } - } - return res -} - -func (w *dnsWatcher) lookupSRV() map[string]*Update { - newAddrs := make(map[string]*Update) - _, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host) - if err != nil { - grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err) - return nil - } - for _, s := range srvs { - lbAddrs, err := lookupHost(w.ctx, s.Target) - if err != nil { - grpclog.Warningf("grpc: failed load balancer address dns lookup due to %v.\n", err) - continue - } - for _, a := range lbAddrs { - a, ok := formatIP(a) - if !ok { - grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err) - continue - } - addr := a + ":" + strconv.Itoa(int(s.Port)) - newAddrs[addr] = &Update{Addr: addr, - Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}} - } - } - return newAddrs -} - -func (w *dnsWatcher) lookupHost() map[string]*Update { - newAddrs := make(map[string]*Update) - addrs, err := lookupHost(w.ctx, w.host) - if err != nil { - grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err) - return nil - } - for _, a := range addrs { - a, ok := formatIP(a) - if !ok { - grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err) - continue - } - addr := a + ":" + w.port - newAddrs[addr] = &Update{Addr: addr} - } - return newAddrs -} - -func (w *dnsWatcher) lookup() []*Update { - newAddrs := w.lookupSRV() - if newAddrs == nil { - // If failed to get any balancer address (either no corresponding SRV for the - // target, or caused by failure during resolution/parsing of the balancer target), - // return any A record info available. - newAddrs = w.lookupHost() - } - result := w.compileUpdate(newAddrs) - w.curAddrs = newAddrs - return result -} - -// Next returns the resolved address update(delta) for the target. If there's no -// change, it will sleep for 30 mins and try to resolve again after that. -func (w *dnsWatcher) Next() ([]*Update, error) { - for { - select { - case <-w.ctx.Done(): - return nil, errWatcherClose - case <-w.t.C: - } - result := w.lookup() - // Next lookup should happen after an interval defined by w.r.freq. - w.t.Reset(w.r.freq) - if len(result) > 0 { - return result, nil - } - } -} - -func (w *dnsWatcher) Close() { - w.cancel() -} diff --git a/naming/dns_resolver_test.go b/naming/dns_resolver_test.go deleted file mode 100644 index a7eff2d4038..00000000000 --- a/naming/dns_resolver_test.go +++ /dev/null @@ -1,341 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package naming - -import ( - "context" - "fmt" - "net" - "reflect" - "sync" - "testing" - "time" - - "google.golang.org/grpc/internal/grpctest" -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -func newUpdateWithMD(op Operation, addr, lb string) *Update { - return &Update{ - Op: op, - Addr: addr, - Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: lb}, - } -} - -func toMap(u []*Update) map[string]*Update { - m := make(map[string]*Update) - for _, v := range u { - m[v.Addr] = v - } - return m -} - -func (s) TestCompileUpdate(t *testing.T) { - tests := []struct { - oldAddrs []string - newAddrs []string - want []*Update - }{ - { - []string{}, - []string{"1.0.0.1"}, - []*Update{{Op: Add, Addr: "1.0.0.1"}}, - }, - { - []string{"1.0.0.1"}, - []string{"1.0.0.1"}, - []*Update{}, - }, - { - []string{"1.0.0.0"}, - []string{"1.0.0.1"}, - []*Update{{Op: Delete, Addr: "1.0.0.0"}, {Op: Add, Addr: "1.0.0.1"}}, - }, - { - []string{"1.0.0.1"}, - []string{"1.0.0.0"}, - []*Update{{Op: Add, Addr: "1.0.0.0"}, {Op: Delete, Addr: "1.0.0.1"}}, - }, - { - []string{"1.0.0.1"}, - []string{"1.0.0.1", "1.0.0.2", "1.0.0.3"}, - []*Update{{Op: Add, Addr: "1.0.0.2"}, {Op: Add, Addr: "1.0.0.3"}}, - }, - { - []string{"1.0.0.1", "1.0.0.2", "1.0.0.3"}, - []string{"1.0.0.0"}, - []*Update{{Op: Add, Addr: "1.0.0.0"}, {Op: Delete, Addr: "1.0.0.1"}, {Op: Delete, Addr: "1.0.0.2"}, {Op: Delete, Addr: "1.0.0.3"}}, - }, - { - []string{"1.0.0.1", "1.0.0.3", "1.0.0.5"}, - []string{"1.0.0.2", "1.0.0.3", "1.0.0.6"}, - []*Update{{Op: Delete, Addr: "1.0.0.1"}, {Op: Add, Addr: "1.0.0.2"}, {Op: Delete, Addr: "1.0.0.5"}, {Op: Add, Addr: "1.0.0.6"}}, - }, - { - []string{"1.0.0.1", "1.0.0.1", "1.0.0.2"}, - []string{"1.0.0.1"}, - []*Update{{Op: Delete, Addr: "1.0.0.2"}}, - }, - } - - var w dnsWatcher - for _, c := range tests { - w.curAddrs = make(map[string]*Update) - newUpdates := make(map[string]*Update) - for _, a := range c.oldAddrs { - w.curAddrs[a] = &Update{Addr: a} - } - for _, a := range c.newAddrs { - newUpdates[a] = &Update{Addr: a} - } - r := w.compileUpdate(newUpdates) - if !reflect.DeepEqual(toMap(c.want), toMap(r)) { - t.Errorf("w(%+v).compileUpdate(%+v) = %+v, want %+v", c.oldAddrs, c.newAddrs, updatesToSlice(r), updatesToSlice(c.want)) - } - } -} - -func (s) TestResolveFunc(t *testing.T) { - tests := []struct { - addr string - want error - }{ - // TODO(yuxuanli): More false cases? - {"www.google.com", nil}, - {"foo.bar:12345", nil}, - {"127.0.0.1", nil}, - {"127.0.0.1:12345", nil}, - {"[::1]:80", nil}, - {"[2001:db8:a0b:12f0::1]:21", nil}, - {":80", nil}, - {"127.0.0...1:12345", nil}, - {"[fe80::1%lo0]:80", nil}, - {"golang.org:http", nil}, - {"[2001:db8::1]:http", nil}, - {":", nil}, - {"", errMissingAddr}, - {"[2001:db8:a0b:12f0::1", fmt.Errorf("invalid target address %v", "[2001:db8:a0b:12f0::1")}, - } - - r, err := NewDNSResolver() - if err != nil { - t.Errorf("%v", err) - } - for _, v := range tests { - _, err := r.Resolve(v.addr) - if !reflect.DeepEqual(err, v.want) { - t.Errorf("Resolve(%q) = %v, want %v", v.addr, err, v.want) - } - } -} - -var hostLookupTbl = map[string][]string{ - "foo.bar.com": {"1.2.3.4", "5.6.7.8"}, - "ipv4.single.fake": {"1.2.3.4"}, - "ipv4.multi.fake": {"1.2.3.4", "5.6.7.8", "9.10.11.12"}, - "ipv6.single.fake": {"2607:f8b0:400a:801::1001"}, - "ipv6.multi.fake": {"2607:f8b0:400a:801::1001", "2607:f8b0:400a:801::1002", "2607:f8b0:400a:801::1003"}, -} - -func hostLookup(host string) ([]string, error) { - if addrs, ok := hostLookupTbl[host]; ok { - return addrs, nil - } - return nil, fmt.Errorf("failed to lookup host:%s resolution in hostLookupTbl", host) -} - -var srvLookupTbl = map[string][]*net.SRV{ - "_grpclb._tcp.srv.ipv4.single.fake": {&net.SRV{Target: "ipv4.single.fake", Port: 1234}}, - "_grpclb._tcp.srv.ipv4.multi.fake": {&net.SRV{Target: "ipv4.multi.fake", Port: 1234}}, - "_grpclb._tcp.srv.ipv6.single.fake": {&net.SRV{Target: "ipv6.single.fake", Port: 1234}}, - "_grpclb._tcp.srv.ipv6.multi.fake": {&net.SRV{Target: "ipv6.multi.fake", Port: 1234}}, -} - -func srvLookup(service, proto, name string) (string, []*net.SRV, error) { - cname := "_" + service + "._" + proto + "." + name - if srvs, ok := srvLookupTbl[cname]; ok { - return cname, srvs, nil - } - return "", nil, fmt.Errorf("failed to lookup srv record for %s in srvLookupTbl", cname) -} - -func updatesToSlice(updates []*Update) []Update { - res := make([]Update, len(updates)) - for i, u := range updates { - res[i] = *u - } - return res -} - -func testResolver(t *testing.T, freq time.Duration, slp time.Duration) { - tests := []struct { - target string - want []*Update - }{ - { - "foo.bar.com", - []*Update{{Op: Add, Addr: "1.2.3.4" + colonDefaultPort}, {Op: Add, Addr: "5.6.7.8" + colonDefaultPort}}, - }, - { - "foo.bar.com:1234", - []*Update{{Op: Add, Addr: "1.2.3.4:1234"}, {Op: Add, Addr: "5.6.7.8:1234"}}, - }, - { - "srv.ipv4.single.fake", - []*Update{newUpdateWithMD(Add, "1.2.3.4:1234", "ipv4.single.fake")}, - }, - { - "srv.ipv4.multi.fake", - []*Update{ - newUpdateWithMD(Add, "1.2.3.4:1234", "ipv4.multi.fake"), - newUpdateWithMD(Add, "5.6.7.8:1234", "ipv4.multi.fake"), - newUpdateWithMD(Add, "9.10.11.12:1234", "ipv4.multi.fake")}, - }, - { - "srv.ipv6.single.fake", - []*Update{newUpdateWithMD(Add, "[2607:f8b0:400a:801::1001]:1234", "ipv6.single.fake")}, - }, - { - "srv.ipv6.multi.fake", - []*Update{ - newUpdateWithMD(Add, "[2607:f8b0:400a:801::1001]:1234", "ipv6.multi.fake"), - newUpdateWithMD(Add, "[2607:f8b0:400a:801::1002]:1234", "ipv6.multi.fake"), - newUpdateWithMD(Add, "[2607:f8b0:400a:801::1003]:1234", "ipv6.multi.fake"), - }, - }, - } - - for _, a := range tests { - r, err := NewDNSResolverWithFreq(freq) - if err != nil { - t.Fatalf("%v\n", err) - } - w, err := r.Resolve(a.target) - if err != nil { - t.Fatalf("%v\n", err) - } - updates, err := w.Next() - if err != nil { - t.Fatalf("%v\n", err) - } - if !reflect.DeepEqual(toMap(a.want), toMap(updates)) { - t.Errorf("Resolve(%q) = %+v, want %+v\n", a.target, updatesToSlice(updates), updatesToSlice(a.want)) - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for { - _, err := w.Next() - if err != nil { - return - } - t.Error("Execution shouldn't reach here, since w.Next() should be blocked until close happen.") - } - }() - // Sleep for sometime to let watcher do more than one lookup - time.Sleep(slp) - w.Close() - wg.Wait() - } -} - -func replaceNetFunc() func() { - oldLookupHost := lookupHost - oldLookupSRV := lookupSRV - lookupHost = func(ctx context.Context, host string) ([]string, error) { - return hostLookup(host) - } - lookupSRV = func(ctx context.Context, service, proto, name string) (string, []*net.SRV, error) { - return srvLookup(service, proto, name) - } - return func() { - lookupHost = oldLookupHost - lookupSRV = oldLookupSRV - } -} - -func (s) TestResolve(t *testing.T) { - defer replaceNetFunc()() - testResolver(t, time.Millisecond*5, time.Millisecond*10) -} - -const colonDefaultPort = ":" + defaultPort - -func (s) TestIPWatcher(t *testing.T) { - tests := []struct { - target string - want []*Update - }{ - {"127.0.0.1", []*Update{{Op: Add, Addr: "127.0.0.1" + colonDefaultPort}}}, - {"127.0.0.1:12345", []*Update{{Op: Add, Addr: "127.0.0.1:12345"}}}, - {"::1", []*Update{{Op: Add, Addr: "[::1]" + colonDefaultPort}}}, - {"[::1]:12345", []*Update{{Op: Add, Addr: "[::1]:12345"}}}, - {"[::1]:", []*Update{{Op: Add, Addr: "[::1]:443"}}}, - {"2001:db8:85a3::8a2e:370:7334", []*Update{{Op: Add, Addr: "[2001:db8:85a3::8a2e:370:7334]" + colonDefaultPort}}}, - {"[2001:db8:85a3::8a2e:370:7334]", []*Update{{Op: Add, Addr: "[2001:db8:85a3::8a2e:370:7334]" + colonDefaultPort}}}, - {"[2001:db8:85a3::8a2e:370:7334]:12345", []*Update{{Op: Add, Addr: "[2001:db8:85a3::8a2e:370:7334]:12345"}}}, - {"[2001:db8::1]:http", []*Update{{Op: Add, Addr: "[2001:db8::1]:http"}}}, - // TODO(yuxuanli): zone support? - } - - for _, v := range tests { - r, err := NewDNSResolverWithFreq(time.Millisecond * 5) - if err != nil { - t.Fatalf("%v\n", err) - } - w, err := r.Resolve(v.target) - if err != nil { - t.Fatalf("%v\n", err) - } - var updates []*Update - var wg sync.WaitGroup - wg.Add(1) - count := 0 - go func() { - defer wg.Done() - for { - u, err := w.Next() - if err != nil { - return - } - updates = u - count++ - } - }() - // Sleep for sometime to let watcher do more than one lookup - time.Sleep(time.Millisecond * 10) - w.Close() - wg.Wait() - if !reflect.DeepEqual(v.want, updates) { - t.Errorf("Resolve(%q) = %v, want %+v\n", v.target, updatesToSlice(updates), updatesToSlice(v.want)) - } - if count != 1 { - t.Errorf("IPWatcher Next() should return only once, not %d times\n", count) - } - } -} diff --git a/naming/naming.go b/naming/naming.go deleted file mode 100644 index f4c1c8b6894..00000000000 --- a/naming/naming.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - * - * Copyright 2014 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package naming defines the naming API and related data structures for gRPC. -// -// This package is deprecated: please use package resolver instead. -package naming - -// Operation defines the corresponding operations for a name resolution change. -// -// Deprecated: please use package resolver. -type Operation uint8 - -const ( - // Add indicates a new address is added. - Add Operation = iota - // Delete indicates an existing address is deleted. - Delete -) - -// Update defines a name resolution update. Notice that it is not valid having both -// empty string Addr and nil Metadata in an Update. -// -// Deprecated: please use package resolver. -type Update struct { - // Op indicates the operation of the update. - Op Operation - // Addr is the updated address. It is empty string if there is no address update. - Addr string - // Metadata is the updated metadata. It is nil if there is no metadata update. - // Metadata is not required for a custom naming implementation. - Metadata interface{} -} - -// Resolver creates a Watcher for a target to track its resolution changes. -// -// Deprecated: please use package resolver. -type Resolver interface { - // Resolve creates a Watcher for target. - Resolve(target string) (Watcher, error) -} - -// Watcher watches for the updates on the specified target. -// -// Deprecated: please use package resolver. -type Watcher interface { - // Next blocks until an update or error happens. It may return one or more - // updates. The first call should get the full set of the results. It should - // return an error if and only if Watcher cannot recover. - Next() ([]*Update, error) - // Close closes the Watcher. - Close() -} diff --git a/picker_wrapper.go b/picker_wrapper.go index a2e996d7295..364fe96caac 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -38,32 +38,10 @@ type pickerWrapper struct { done bool blockingCh chan struct{} picker balancer.Picker - - // The latest connection error. TODO: remove when V1 picker is deprecated; - // balancer should be responsible for providing the error. - *connErr -} - -type connErr struct { - mu sync.Mutex - err error -} - -func (c *connErr) updateConnectionError(err error) { - c.mu.Lock() - c.err = err - c.mu.Unlock() -} - -func (c *connErr) connectionError() error { - c.mu.Lock() - err := c.err - c.mu.Unlock() - return err } func newPickerWrapper() *pickerWrapper { - return &pickerWrapper{blockingCh: make(chan struct{}), connErr: &connErr{}} + return &pickerWrapper{blockingCh: make(chan struct{})} } // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. @@ -128,8 +106,6 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. var errStr string if lastPickErr != nil { errStr = "latest balancer error: " + lastPickErr.Error() - } else if connectionErr := pw.connectionError(); connectionErr != nil { - errStr = "latest connection error: " + connectionErr.Error() } else { errStr = ctx.Err().Error() } diff --git a/pickfirst.go b/pickfirst.go index 7ba90ae5ab1..95791a56554 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -94,7 +94,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { if grpclog.V(2) { - grpclog.Infof("pickfirstBalancer: HandleSubConnStateChange: %p, %v", sc, s) + grpclog.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s) } if b.sc != sc { if grpclog.V(2) { diff --git a/service_config.go b/service_config.go index c8267fc8eb3..37d4a58f122 100644 --- a/service_config.go +++ b/service_config.go @@ -79,7 +79,7 @@ type ServiceConfig struct { serviceconfig.Config // LB is the load balancer the service providers recommends. The balancer - // specified via grpc.WithBalancer will override this. This is deprecated; + // specified via grpc.WithBalancerName will override this. This is deprecated; // lbConfigs is preferred. If lbConfig and LB are both present, lbConfig // will be used. LB *string diff --git a/status/status_test.go b/status/status_test.go index 839a3c390ed..1f8578e27a6 100644 --- a/status/status_test.go +++ b/status/status_test.go @@ -64,7 +64,7 @@ func (s) TestErrorsWithSameParameters(t *testing.T) { e1 := Errorf(codes.AlreadyExists, description) e2 := Errorf(codes.AlreadyExists, description) if e1 == e2 || !errEqual(e1, e2) { - t.Fatalf("Errors should be equivalent but unique - e1: %v, %v e2: %p, %v", e1.(*status.Error), e1, e2.(*status.Error), e2) + t.Fatalf("Errors should be equivalent but unique - e1: %v, %v e2: %p, %v", e1.(*status.ErrorT), e1, e2.(*status.ErrorT), e2) } } @@ -116,7 +116,7 @@ func (s) TestError(t *testing.T) { func (s) TestErrorOK(t *testing.T) { err := Error(codes.OK, "foo") if err != nil { - t.Fatalf("Error(codes.OK, _) = %p; want nil", err.(*status.Error)) + t.Fatalf("Error(codes.OK, _) = %p; want nil", err.(*status.ErrorT)) } } diff --git a/test/channelz_test.go b/test/channelz_test.go index 75846640e22..c69e0cec2e6 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -1880,7 +1880,8 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) { czCleanup := channelz.NewChannelzStorage() defer czCleanupWrapper(czCleanup, t) e := tcpClearRREnv - // avoid newTest using WithBalancer, which would override service config's change of balancer below. + // avoid newTest using WithBalancerName, which would override service + // config's change of balancer below. e.balancer = "" te := newTest(t, e) channelz.SetMaxTraceEntry(1) diff --git a/test/creds_test.go b/test/creds_test.go index bc4dca17a78..7b607ef0371 100644 --- a/test/creds_test.go +++ b/test/creds_test.go @@ -157,7 +157,7 @@ func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { } func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { - te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "empty", balancer: "v1"}) + te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "empty"}) te.userAgent = testAppUA te.startServer(&testServer{security: te.e.security}) defer te.tearDown() @@ -181,7 +181,7 @@ func (m *methodTestCreds) RequireTransportSecurity() bool { return false } func (s) TestGRPCMethodAccessibleToCredsViaContextRequestInfo(t *testing.T) { const wantMethod = "/grpc.testing.TestService/EmptyCall" - te := newTest(t, env{name: "context-request-info", network: "tcp", balancer: "v1"}) + te := newTest(t, env{name: "context-request-info", network: "tcp"}) te.userAgent = testAppUA te.startServer(&testServer{security: te.e.security}) defer te.tearDown() diff --git a/test/end2end_test.go b/test/end2end_test.go index 7f628f9d914..f3a60de5a96 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -567,6 +567,7 @@ func newTest(t *testing.T, e env) *test { } func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener { + te.t.Helper() te.t.Logf("Running test in %s environment...", te.e.name) sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)} if te.maxServerMsgSize != nil { @@ -698,6 +699,7 @@ func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listene // startServer starts a gRPC server exposing the provided TestService // implementation. Callers should defer a call to te.tearDown to clean up func (te *test) startServer(ts testpb.TestServiceServer) { + te.t.Helper() te.listenAndServe(ts, net.Listen) } @@ -4708,208 +4710,6 @@ func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) { } } -type clientTimeoutCreds struct { - timeoutReturned bool -} - -func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - if !c.timeoutReturned { - c.timeoutReturned = true - return nil, nil, context.DeadlineExceeded - } - return rawConn, nil, nil -} -func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - return rawConn, nil, nil -} -func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo { - return credentials.ProtocolInfo{} -} -func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials { - return nil -} -func (c *clientTimeoutCreds) OverrideServerName(s string) error { - return nil -} - -func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) { - te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds"}) - te.userAgent = testAppUA - te.startServer(&testServer{security: te.e.security}) - defer te.tearDown() - - cc := te.clientConn() - tc := testpb.NewTestServiceClient(cc) - // This unary call should succeed, because ClientHandshake will succeed for the second time. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want ", err) - } -} - -type serverDispatchCred struct { - rawConnCh chan net.Conn -} - -func newServerDispatchCred() *serverDispatchCred { - return &serverDispatchCred{ - rawConnCh: make(chan net.Conn, 1), - } -} -func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - return rawConn, nil, nil -} -func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - select { - case c.rawConnCh <- rawConn: - default: - } - return nil, nil, credentials.ErrConnDispatched -} -func (c *serverDispatchCred) Info() credentials.ProtocolInfo { - return credentials.ProtocolInfo{} -} -func (c *serverDispatchCred) Clone() credentials.TransportCredentials { - return nil -} -func (c *serverDispatchCred) OverrideServerName(s string) error { - return nil -} -func (c *serverDispatchCred) getRawConn() net.Conn { - return <-c.rawConnCh -} - -func (s) TestServerCredsDispatch(t *testing.T) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to listen: %v", err) - } - cred := newServerDispatchCred() - s := grpc.NewServer(grpc.Creds(cred)) - go s.Serve(lis) - defer s.Stop() - - cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred)) - if err != nil { - t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) - } - defer cc.Close() - - rawConn := cred.getRawConn() - // Give grpc a chance to see the error and potentially close the connection. - // And check that connection is not closed after that. - time.Sleep(100 * time.Millisecond) - // Check rawConn is not closed. - if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil { - t.Errorf("Read() = %v, %v; want n>0, ", n, err) - } -} - -type authorityCheckCreds struct { - got string -} - -func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - return rawConn, nil, nil -} -func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { - c.got = authority - return rawConn, nil, nil -} -func (c *authorityCheckCreds) Info() credentials.ProtocolInfo { - return credentials.ProtocolInfo{} -} -func (c *authorityCheckCreds) Clone() credentials.TransportCredentials { - return c -} -func (c *authorityCheckCreds) OverrideServerName(s string) error { - return nil -} - -// This test makes sure that the authority client handshake gets is the endpoint -// in dial target, not the resolved ip address. -func (s) TestCredsHandshakeAuthority(t *testing.T) { - const testAuthority = "test.auth.ori.ty" - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to listen: %v", err) - } - cred := &authorityCheckCreds{} - s := grpc.NewServer() - go s.Serve(lis) - defer s.Stop() - - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - - cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred)) - if err != nil { - t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) - } - defer cc.Close() - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - for { - s := cc.GetState() - if s == connectivity.Ready { - break - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - t.Fatalf("ClientConn is not ready after 100 ms") - } - } - - if cred.got != testAuthority { - t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority) - } -} - -// This test makes sure that the authority client handshake gets is the endpoint -// of the ServerName of the address when it is set. -func (s) TestCredsHandshakeServerNameAuthority(t *testing.T) { - const testAuthority = "test.auth.ori.ty" - const testServerName = "test.server.name" - - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to listen: %v", err) - } - cred := &authorityCheckCreds{} - s := grpc.NewServer() - go s.Serve(lis) - defer s.Stop() - - r, rcleanup := manual.GenerateAndRegisterManualResolver() - defer rcleanup() - - cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred)) - if err != nil { - t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) - } - defer cc.Close() - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String(), ServerName: testServerName}}}) - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - for { - s := cc.GetState() - if s == connectivity.Ready { - break - } - if !cc.WaitForStateChange(ctx, s) { - // ctx got timeout or canceled. - t.Fatalf("ClientConn is not ready after 100 ms") - } - } - - if cred.got != testServerName { - t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority) - } -} - type clientFailCreds struct{} func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { @@ -4952,7 +4752,6 @@ func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) { } } ->>>>>>> c9ad8dff... balancer: move Balancer and Picker to V2; delete legacy API func (s) TestFlowControlLogicalRace(t *testing.T) { // Test for a regression of https://github.com/grpc/grpc-go/issues/632, // and other flow control bugs. diff --git a/vet.sh b/vet.sh index 493e67b511e..2fa14169b37 100755 --- a/vet.sh +++ b/vet.sh @@ -125,12 +125,10 @@ staticcheck -go 1.9 -checks 'inherit,-ST1015' ./... > "${SC_OUT}" || true not grep -v "is deprecated:.*SA1019" "${SC_OUT}" # Only ignore the following deprecated types/fields/functions. not grep -Fv '.CredsBundle -.HandleResolvedAddrs -.HandleSubConnStateChange .HeaderMap +.Metadata is deprecated: use Attributes .NewAddress .NewServiceConfig -.Metadata is deprecated: use Attributes .Type is deprecated: use Attributes balancer.Picker grpc.CallCustomCodec @@ -143,9 +141,7 @@ grpc.NewGZIPCompressor grpc.NewGZIPDecompressor grpc.RPCCompressor grpc.RPCDecompressor -grpc.RoundRobin grpc.ServiceConfig -grpc.WithBalancer grpc.WithBalancerName grpc.WithCompressor grpc.WithDecompressor @@ -155,9 +151,6 @@ grpc.WithServiceConfig grpc.WithTimeout http.CloseNotifier info.SecurityVersion -naming.Resolver -naming.Update -naming.Watcher resolver.Backend resolver.GRPCLB' "${SC_OUT}" diff --git a/xds/internal/balancer/balancergroup/balancergroup.go b/xds/internal/balancer/balancergroup/balancergroup.go index fdc3f42ec5b..d6428afb96a 100644 --- a/xds/internal/balancer/balancergroup/balancergroup.go +++ b/xds/internal/balancer/balancergroup/balancergroup.go @@ -123,8 +123,7 @@ func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnSta // it's the lower priority, but it can still get address updates. return nil } - return ub.UpdateClientConnState(s) - return nil + return b.UpdateClientConnState(s) } func (sbc *subBalancerWithConfig) stopBalancer() { diff --git a/xds/internal/balancer/balancergroup/balancergroup_test.go b/xds/internal/balancer/balancergroup/balancergroup_test.go index f2804abf50e..8c211c04161 100644 --- a/xds/internal/balancer/balancergroup/balancergroup_test.go +++ b/xds/internal/balancer/balancergroup/balancergroup_test.go @@ -49,7 +49,7 @@ func init() { DefaultSubBalancerCloseTimeout = time.Millisecond } -func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn { +func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { return func() balancer.SubConn { scst, _ := p.Pick(balancer.PickInfo{}) return scst.SubConn diff --git a/xds/internal/balancer/edsbalancer/balancergroup_test.go b/xds/internal/balancer/edsbalancer/balancergroup_test.go deleted file mode 100644 index e680455be6d..00000000000 --- a/xds/internal/balancer/edsbalancer/balancergroup_test.go +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package edsbalancer - -import ( - "fmt" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal" - orcapb "google.golang.org/grpc/xds/internal/proto/udpa/data/orca/v1" -) - -var ( - rrBuilder = balancer.Get(roundrobin.Name) - testBalancerIDs = []internal.Locality{{Region: "b1"}, {Region: "b2"}, {Region: "b3"}} - testBackendAddrs []resolver.Address -) - -const testBackendAddrsCount = 12 - -func init() { - for i := 0; i < testBackendAddrsCount; i++ { - testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)}) - } - - // Disable caching for all tests. It will be re-enabled in caching specific - // tests. - defaultSubBalancerCloseTimeout = time.Millisecond -} - -func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { - return func() balancer.SubConn { - scst, _ := p.Pick(balancer.PickInfo{}) - return scst.SubConn - } -} - -// 1 balancer, 1 backend -> 2 backends -> 1 backend. -func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add one balancer to group. - bg.add(testBalancerIDs[0], 1, rrBuilder) - // Send one resolved address. - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) - - // Send subconn state change. - sc1 := <-cc.newSubConnCh - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - - // Test pick with one backend. - p1 := <-cc.newPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } - } - - // Send two addresses. - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - // Expect one new subconn, send state update. - sc2 := <-cc.newSubConnCh - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - - // Test roundrobin pick. - p2 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Remove the first address. - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2]) - scToRemove := <-cc.removeSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) - } - bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown) - - // Test pick with only the second subconn. - p3 := <-cc.newPickerCh - for i := 0; i < 5; i++ { - gotSC, _ := p3.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2) - } - } -} - -// 2 balancers, each with 1 backend. -func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add two balancers to group and send one resolved address to both - // balancers. - bg.add(testBalancerIDs[0], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) - sc1 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1]) - sc2 := <-cc.newSubConnCh - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc2} - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } -} - -// 2 balancers, each with more than 1 backends. -func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add two balancers to group and send one resolved address to both - // balancers. - bg.add(testBalancerIDs[0], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - sc1 := <-cc.newSubConnCh - sc2 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - sc3 := <-cc.newSubConnCh - sc4 := <-cc.newSubConnCh - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - bg.handleSubConnStateChange(sc3, connectivity.Connecting) - bg.handleSubConnStateChange(sc3, connectivity.Ready) - bg.handleSubConnStateChange(sc4, connectivity.Connecting) - bg.handleSubConnStateChange(sc4, connectivity.Ready) - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc2, sc3, sc4} - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Turn sc2's connection down, should be RR between balancers. - bg.handleSubConnStateChange(sc2, connectivity.TransientFailure) - p2 := <-cc.newPickerCh - // Expect two sc1's in the result, because balancer1 will be picked twice, - // but there's only one sc in it. - want = []balancer.SubConn{sc1, sc1, sc3, sc4} - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Remove sc3's addresses. - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4]) - scToRemove := <-cc.removeSubConnCh - if !cmp.Equal(scToRemove, sc3, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove) - } - bg.handleSubConnStateChange(scToRemove, connectivity.Shutdown) - p3 := <-cc.newPickerCh - want = []balancer.SubConn{sc1, sc4} - if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Turn sc1's connection down. - bg.handleSubConnStateChange(sc1, connectivity.TransientFailure) - p4 := <-cc.newPickerCh - want = []balancer.SubConn{sc4} - if err := isRoundRobin(want, subConnFromPicker(p4)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Turn last connection to connecting. - bg.handleSubConnStateChange(sc4, connectivity.Connecting) - p5 := <-cc.newPickerCh - for i := 0; i < 5; i++ { - if _, err := p5.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } - } - - // Turn all connections down. - bg.handleSubConnStateChange(sc4, connectivity.TransientFailure) - p6 := <-cc.newPickerCh - for i := 0; i < 5; i++ { - if _, err := p6.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { - t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) - } - } -} - -// 2 balancers with different weights. -func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add two balancers to group and send two resolved addresses to both - // balancers. - bg.add(testBalancerIDs[0], 2, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - sc1 := <-cc.newSubConnCh - sc2 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - sc3 := <-cc.newSubConnCh - sc4 := <-cc.newSubConnCh - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - bg.handleSubConnStateChange(sc3, connectivity.Connecting) - bg.handleSubConnStateChange(sc3, connectivity.Ready) - bg.handleSubConnStateChange(sc4, connectivity.Connecting) - bg.handleSubConnStateChange(sc4, connectivity.Ready) - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } -} - -// totally 3 balancers, add/remove balancer. -func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add three balancers to group and send one resolved address to both - // balancers. - bg.add(testBalancerIDs[0], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1]) - sc1 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2]) - sc2 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[2], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2]) - sc3 := <-cc.newSubConnCh - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - bg.handleSubConnStateChange(sc3, connectivity.Connecting) - bg.handleSubConnStateChange(sc3, connectivity.Ready) - - p1 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc2, sc3} - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // Remove the second balancer, while the others two are ready. - bg.remove(testBalancerIDs[1]) - scToRemove := <-cc.removeSubConnCh - if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) - } - p2 := <-cc.newPickerCh - want = []balancer.SubConn{sc1, sc3} - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - // move balancer 3 into transient failure. - bg.handleSubConnStateChange(sc3, connectivity.TransientFailure) - // Remove the first balancer, while the third is transient failure. - bg.remove(testBalancerIDs[0]) - scToRemove = <-cc.removeSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) - } - p3 := <-cc.newPickerCh - for i := 0; i < 5; i++ { - if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { - t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) - } - } -} - -// 2 balancers, change balancer weight. -func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - bg.start() - - // Add two balancers to group and send two resolved addresses to both - // balancers. - bg.add(testBalancerIDs[0], 2, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - sc1 := <-cc.newSubConnCh - sc2 := <-cc.newSubConnCh - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - sc3 := <-cc.newSubConnCh - sc4 := <-cc.newSubConnCh - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - bg.handleSubConnStateChange(sc3, connectivity.Connecting) - bg.handleSubConnStateChange(sc3, connectivity.Ready) - bg.handleSubConnStateChange(sc4, connectivity.Connecting) - bg.handleSubConnStateChange(sc4, connectivity.Ready) - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - bg.changeWeight(testBalancerIDs[0], 3) - - // Test roundrobin with new weight. - p2 := <-cc.newPickerCh - want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4} - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } -} - -func (s) TestBalancerGroup_LoadReport(t *testing.T) { - testLoadStore := newTestLoadStore() - - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, testLoadStore, nil) - bg.start() - - backendToBalancerID := make(map[balancer.SubConn]internal.Locality) - - // Add two balancers to group and send two resolved addresses to both - // balancers. - bg.add(testBalancerIDs[0], 2, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - sc1 := <-cc.newSubConnCh - sc2 := <-cc.newSubConnCh - backendToBalancerID[sc1] = testBalancerIDs[0] - backendToBalancerID[sc2] = testBalancerIDs[0] - - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - sc3 := <-cc.newSubConnCh - sc4 := <-cc.newSubConnCh - backendToBalancerID[sc3] = testBalancerIDs[1] - backendToBalancerID[sc4] = testBalancerIDs[1] - - // Send state changes for both subconns. - bg.handleSubConnStateChange(sc1, connectivity.Connecting) - bg.handleSubConnStateChange(sc1, connectivity.Ready) - bg.handleSubConnStateChange(sc2, connectivity.Connecting) - bg.handleSubConnStateChange(sc2, connectivity.Ready) - bg.handleSubConnStateChange(sc3, connectivity.Connecting) - bg.handleSubConnStateChange(sc3, connectivity.Ready) - bg.handleSubConnStateChange(sc4, connectivity.Connecting) - bg.handleSubConnStateChange(sc4, connectivity.Ready) - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - var ( - wantStart []internal.Locality - wantEnd []internal.Locality - wantCost []testServerLoad - ) - for i := 0; i < 10; i++ { - scst, _ := p1.Pick(balancer.PickInfo{}) - locality := backendToBalancerID[scst.SubConn] - wantStart = append(wantStart, locality) - if scst.Done != nil && scst.SubConn != sc1 { - scst.Done(balancer.DoneInfo{ - ServerLoad: &orcapb.OrcaLoadReport{ - CpuUtilization: 10, - MemUtilization: 5, - RequestCost: map[string]float64{"pic": 3.14}, - Utilization: map[string]float64{"piu": 3.14}, - }, - }) - wantEnd = append(wantEnd, locality) - wantCost = append(wantCost, - testServerLoad{name: serverLoadCPUName, d: 10}, - testServerLoad{name: serverLoadMemoryName, d: 5}, - testServerLoad{name: "pic", d: 3.14}, - testServerLoad{name: "piu", d: 3.14}) - } - } - - if !cmp.Equal(testLoadStore.callsStarted, wantStart) { - t.Fatalf("want started: %v, got: %v", testLoadStore.callsStarted, wantStart) - } - if !cmp.Equal(testLoadStore.callsEnded, wantEnd) { - t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd) - } - if !cmp.Equal(testLoadStore.callsCost, wantCost, cmp.AllowUnexported(testServerLoad{})) { - t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost) - } -} - -// Create a new balancer group, add balancer and backends, but not start. -// - b1, weight 2, backends [0,1] -// - b2, weight 1, backends [2,3] -// Start the balancer group and check behavior. -// -// Close the balancer group, call add/remove/change weight/change address. -// - b2, weight 3, backends [0,3] -// - b3, weight 1, backends [1,2] -// Start the balancer group again and check for behavior. -func (s) TestBalancerGroup_start_close(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - - // Add two balancers to group and send two resolved addresses to both - // balancers. - bg.add(testBalancerIDs[0], 2, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - - bg.start() - - m1 := make(map[resolver.Address]balancer.SubConn) - for i := 0; i < 4; i++ { - addrs := <-cc.newSubConnAddrsCh - sc := <-cc.newSubConnCh - m1[addrs[0]] = sc - bg.handleSubConnStateChange(sc, connectivity.Connecting) - bg.handleSubConnStateChange(sc, connectivity.Ready) - } - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[0]], - m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], - m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], - } - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - bg.close() - for i := 0; i < 4; i++ { - bg.handleSubConnStateChange(<-cc.removeSubConnCh, connectivity.Shutdown) - } - - // Add b3, weight 1, backends [1,2]. - bg.add(testBalancerIDs[2], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:3]) - - // Remove b1. - bg.remove(testBalancerIDs[0]) - - // Update b2 to weight 3, backends [0,3]. - bg.changeWeight(testBalancerIDs[1], 3) - bg.handleResolvedAddrs(testBalancerIDs[1], append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])) - - bg.start() - - m2 := make(map[resolver.Address]balancer.SubConn) - for i := 0; i < 4; i++ { - addrs := <-cc.newSubConnAddrsCh - sc := <-cc.newSubConnCh - m2[addrs[0]] = sc - bg.handleSubConnStateChange(sc, connectivity.Connecting) - bg.handleSubConnStateChange(sc, connectivity.Ready) - } - - // Test roundrobin on the last picker. - p2 := <-cc.newPickerCh - want = []balancer.SubConn{ - m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], - m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], - m2[testBackendAddrs[1]], m2[testBackendAddrs[2]], - } - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } -} - -// Test that balancer group start() doesn't deadlock if the balancer calls back -// into balancer group inline when it gets an update. -// -// The potential deadlock can happen if we -// - hold a lock and send updates to balancer (e.g. update resolved addresses) -// - the balancer calls back (NewSubConn or update picker) in line -// The callback will try to hold hte same lock again, which will cause a -// deadlock. -// -// This test starts the balancer group with a test balancer, will updates picker -// whenever it gets an address update. It's expected that start() doesn't block -// because of deadlock. -func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - - bg.add(testBalancerIDs[0], 2, &testConstBalancerBuilder{}) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - bg.add(testBalancerIDs[1], 1, &testConstBalancerBuilder{}) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - - bg.start() -} - -func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() { - old := defaultSubBalancerCloseTimeout - defaultSubBalancerCloseTimeout = n - return func() { defaultSubBalancerCloseTimeout = old } -} - -// initBalancerGroupForCachingTest creates a balancer group, and initialize it -// to be ready for caching tests. -// -// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer -// is removed later, so the balancer group returned has one sub-balancer in its -// own map, and one sub-balancer in cache. -func initBalancerGroupForCachingTest(t *testing.T) (*balancerGroup, *testClientConn, map[resolver.Address]balancer.SubConn) { - cc := newTestClientConn(t) - bg := newBalancerGroup(cc, nil, nil) - - // Add two balancers to group and send two resolved addresses to both - // balancers. - bg.add(testBalancerIDs[0], 2, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2]) - bg.add(testBalancerIDs[1], 1, rrBuilder) - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4]) - - bg.start() - - m1 := make(map[resolver.Address]balancer.SubConn) - for i := 0; i < 4; i++ { - addrs := <-cc.newSubConnAddrsCh - sc := <-cc.newSubConnCh - m1[addrs[0]] = sc - bg.handleSubConnStateChange(sc, connectivity.Connecting) - bg.handleSubConnStateChange(sc, connectivity.Ready) - } - - // Test roundrobin on the last picker. - p1 := <-cc.newPickerCh - want := []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[0]], - m1[testBackendAddrs[1]], m1[testBackendAddrs[1]], - m1[testBackendAddrs[2]], m1[testBackendAddrs[3]], - } - if err := isRoundRobin(want, subConnFromPicker(p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - bg.remove(testBalancerIDs[1]) - // Don't wait for SubConns to be removed after close, because they are only - // removed after close timeout. - for i := 0; i < 10; i++ { - select { - case <-cc.removeSubConnCh: - t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)") - default: - } - time.Sleep(time.Millisecond) - } - // Test roundrobin on the with only sub-balancer0. - p2 := <-cc.newPickerCh - want = []balancer.SubConn{ - m1[testBackendAddrs[0]], m1[testBackendAddrs[1]], - } - if err := isRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - return bg, cc, m1 -} - -// Test that if a sub-balancer is removed, and re-added within close timeout, -// the subConns won't be re-created. -func (s) TestBalancerGroup_locality_caching(t *testing.T) { - defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)() - bg, cc, addrToSC := initBalancerGroupForCachingTest(t) - - // Turn down subconn for addr2, shouldn't get picker update because - // sub-balancer1 was removed. - bg.handleSubConnStateChange(addrToSC[testBackendAddrs[2]], connectivity.TransientFailure) - for i := 0; i < 10; i++ { - select { - case <-cc.newPickerCh: - t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)") - default: - } - time.Sleep(time.Millisecond) - } - - // Sleep, but sleep less then close timeout. - time.Sleep(time.Millisecond * 100) - - // Re-add sub-balancer-1, because subconns were in cache, no new subconns - // should be created. But a new picker will still be generated, with subconn - // states update to date. - bg.add(testBalancerIDs[1], 1, rrBuilder) - - p3 := <-cc.newPickerCh - want := []balancer.SubConn{ - addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]], - addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]], - // addr2 is down, b2 only has addr3 in READY state. - addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]], - } - if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } - - for i := 0; i < 10; i++ { - select { - case <-cc.newSubConnAddrsCh: - t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)") - default: - } - time.Sleep(time.Millisecond * 10) - } -} - -// Sub-balancers are put in cache when they are removed. If balancer group is -// closed within close timeout, all subconns should still be rmeoved -// immediately. -func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) { - defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)() - bg, cc, addrToSC := initBalancerGroupForCachingTest(t) - - bg.close() - // The balancer group is closed. The subconns should be removed immediately. - removeTimeout := time.After(time.Millisecond * 500) - scToRemove := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[0]]: 1, - addrToSC[testBackendAddrs[1]]: 1, - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, - } - for i := 0; i < len(scToRemove); i++ { - select { - case sc := <-cc.removeSubConnCh: - c := scToRemove[sc] - if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) - } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") - } - } -} - -// Sub-balancers in cache will be closed if not re-added within timeout, and -// subConns will be removed. -func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) { - defer replaceDefaultSubBalancerCloseTimeout(time.Second)() - _, cc, addrToSC := initBalancerGroupForCachingTest(t) - - // The sub-balancer is not re-added withtin timeout. The subconns should be - // removed. - removeTimeout := time.After(defaultSubBalancerCloseTimeout) - scToRemove := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, - } - for i := 0; i < len(scToRemove); i++ { - select { - case sc := <-cc.removeSubConnCh: - c := scToRemove[sc] - if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) - } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") - } - } -} - -// Wrap the rr builder, so it behaves the same, but has a different pointer. -type noopBalancerBuilderWrapper struct { - balancer.Builder -} - -// After removing a sub-balancer, re-add with same ID, but different balancer -// builder. Old subconns should be removed, and new subconns should be created. -func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) { - defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)() - bg, cc, addrToSC := initBalancerGroupForCachingTest(t) - - // Re-add sub-balancer-1, but with a different balancer builder. The - // sub-balancer was still in cache, but cann't be reused. This should cause - // old sub-balancer's subconns to be removed immediately, and new subconns - // to be created. - bg.add(testBalancerIDs[1], 1, &noopBalancerBuilderWrapper{rrBuilder}) - - // The cached sub-balancer should be closed, and the subconns should be - // removed immediately. - removeTimeout := time.After(time.Millisecond * 500) - scToRemove := map[balancer.SubConn]int{ - addrToSC[testBackendAddrs[2]]: 1, - addrToSC[testBackendAddrs[3]]: 1, - } - for i := 0; i < len(scToRemove); i++ { - select { - case sc := <-cc.removeSubConnCh: - c := scToRemove[sc] - if c == 0 { - t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c) - } - scToRemove[sc] = c - 1 - case <-removeTimeout: - t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed") - } - } - - bg.handleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[4:6]) - - newSCTimeout := time.After(time.Millisecond * 500) - scToAdd := map[resolver.Address]int{ - testBackendAddrs[4]: 1, - testBackendAddrs[5]: 1, - } - for i := 0; i < len(scToAdd); i++ { - select { - case addr := <-cc.newSubConnAddrsCh: - c := scToAdd[addr[0]] - if c == 0 { - t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c) - } - scToAdd[addr[0]] = c - 1 - sc := <-cc.newSubConnCh - addrToSC[addr[0]] = sc - bg.handleSubConnStateChange(sc, connectivity.Connecting) - bg.handleSubConnStateChange(sc, connectivity.Ready) - case <-newSCTimeout: - t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed") - } - } - - // Test roundrobin on the new picker. - p3 := <-cc.newPickerCh - want := []balancer.SubConn{ - addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]], - addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]], - addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]], - } - if err := isRoundRobin(want, subConnFromPicker(p3)); err != nil { - t.Fatalf("want %v, got %v", want, err) - } -} diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 6fbffd234de..1e449430697 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -383,58 +383,7 @@ func (s) TestClose(t *testing.T) { edsb := newEDSBalancerImpl(nil, nil, nil, nil) // This is what could happen when switching between fallback and eds. This // make sure it doesn't panic. - edsb.Close() -} - -func init() { - balancer.Register(&testConstBalancerBuilder{}) -} - -var errTestConstPicker = fmt.Errorf("const picker error") - -type testConstBalancerBuilder struct{} - -func (*testConstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return &testConstBalancer{cc: cc} -} - -func (*testConstBalancerBuilder) Name() string { - return "test-const-balancer" -} - -type testConstBalancer struct { - cc balancer.ClientConn -} - -func (tb *testConstBalancer) ResolverError(error) { - panic("not implemented") -} - -func (tb *testConstBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { - tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &testConstPicker{err: errTestConstPicker}}) -} - -func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error { - a := s.ResolverState.Addresses - if len(a) > 0 { - tb.cc.NewSubConn(a, balancer.NewSubConnOptions{}) - } - return nil -} - -func (*testConstBalancer) Close() { -} - -type testConstPicker struct { - err error - sc balancer.SubConn -} - -func (tcp *testConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - if tcp.err != nil { - return balancer.PickResult{}, tcp.err - } - return balancer.PickResult{SubConn: tcp.sc}, nil + edsb.close() } // Create XDS balancer, and update sub-balancer before handling eds responses. diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index eab7d137d87..5f8b04d6820 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -55,7 +55,7 @@ func init() { } } -func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn { +func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { return func() balancer.SubConn { scst, _ := p.Pick(balancer.PickInfo{}) return scst.SubConn diff --git a/xds/internal/balancer/edsbalancer/test_util_test.go b/xds/internal/balancer/edsbalancer/test_util_test.go deleted file mode 100644 index e2b3c56a381..00000000000 --- a/xds/internal/balancer/edsbalancer/test_util_test.go +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package edsbalancer - -import ( - "context" - "fmt" - "testing" - - "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" -) - -const testSubConnsCount = 16 - -var testSubConns []*testSubConn - -func init() { - for i := 0; i < testSubConnsCount; i++ { - testSubConns = append(testSubConns, &testSubConn{ - id: fmt.Sprintf("sc%d", i), - }) - } -} - -type testSubConn struct { - id string -} - -func (tsc *testSubConn) UpdateAddresses([]resolver.Address) { - panic("not implemented") -} - -func (tsc *testSubConn) Connect() { -} - -// Implement stringer to get human friendly error message. -func (tsc *testSubConn) String() string { - return tsc.id -} - -type testClientConn struct { - t *testing.T // For logging only. - - newSubConnAddrsCh chan []resolver.Address // The last 10 []Address to create subconn. - newSubConnCh chan balancer.SubConn // The last 10 subconn created. - removeSubConnCh chan balancer.SubConn // The last 10 subconn removed. - - newPickerCh chan balancer.Picker // The last picker updated. - newStateCh chan connectivity.State // The last state. - - subConnIdx int -} - -func newTestClientConn(t *testing.T) *testClientConn { - return &testClientConn{ - t: t, - - newSubConnAddrsCh: make(chan []resolver.Address, 10), - newSubConnCh: make(chan balancer.SubConn, 10), - removeSubConnCh: make(chan balancer.SubConn, 10), - - newPickerCh: make(chan balancer.Picker, 1), - newStateCh: make(chan connectivity.State, 1), - } -} - -func (tcc *testClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { - sc := testSubConns[tcc.subConnIdx] - tcc.subConnIdx++ - - tcc.t.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc) - select { - case tcc.newSubConnAddrsCh <- a: - default: - } - - select { - case tcc.newSubConnCh <- sc: - default: - } - - return sc, nil -} - -func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) { - tcc.t.Logf("testClientCOnn: RemoveSubConn(%p)", sc) - select { - case tcc.removeSubConnCh <- sc: - default: - } -} - -func (tcc *testClientConn) UpdateState(bs balancer.State) { - tcc.t.Logf("testClientConn: UpdateState(%v)", bs) - select { - case <-tcc.newStateCh: - default: - } - tcc.newStateCh <- bs.ConnectivityState - - select { - case <-tcc.newPickerCh: - default: - } - tcc.newPickerCh <- bs.Picker -} - -func (tcc *testClientConn) ResolveNow(resolver.ResolveNowOptions) { - panic("not implemented") -} - -func (tcc *testClientConn) Target() string { - panic("not implemented") -} - -type testServerLoad struct { - name string - d float64 -} - -type testLoadStore struct { - callsStarted []internal.Locality - callsEnded []internal.Locality - callsCost []testServerLoad -} - -func newTestLoadStore() *testLoadStore { - return &testLoadStore{} -} - -func (*testLoadStore) CallDropped(category string) { - panic("not implemented") -} - -func (tls *testLoadStore) CallStarted(l internal.Locality) { - tls.callsStarted = append(tls.callsStarted, l) -} - -func (tls *testLoadStore) CallFinished(l internal.Locality, err error) { - tls.callsEnded = append(tls.callsEnded, l) -} - -func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d float64) { - tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d}) -} - -func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn, clusterName string, node *corepb.Node) { - panic("not implemented") -} - -// isRoundRobin checks whether f's return value is roundrobin of elements from -// want. But it doesn't check for the order. Note that want can contain -// duplicate items, which makes it weight-round-robin. -// -// Step 1. the return values of f should form a permutation of all elements in -// want, but not necessary in the same order. E.g. if want is {a,a,b}, the check -// fails if f returns: -// - {a,a,a}: third a is returned before b -// - {a,b,b}: second b is returned before the second a -// -// If error is found in this step, the returned error contains only the first -// iteration until where it goes wrong. -// -// Step 2. the return values of f should be repetitions of the same permutation. -// E.g. if want is {a,a,b}, the check failes if f returns: -// - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not -// repeating the first iteration. -// -// If error is found in this step, the returned error contains the first -// iteration + the second iteration until where it goes wrong. -func isRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error { - wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR. - for _, sc := range want { - wantSet[sc]++ - } - - // The first iteration: makes sure f's return values form a permutation of - // elements in want. - // - // Also keep the returns values in a slice, so we can compare the order in - // the second iteration. - gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want)) - for range want { - got := f() - gotSliceFirstIteration = append(gotSliceFirstIteration, got) - wantSet[got]-- - if wantSet[got] < 0 { - return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration) - } - } - - // The second iteration should repeat the first iteration. - var gotSliceSecondIteration []balancer.SubConn - for i := 0; i < 2; i++ { - for _, w := range gotSliceFirstIteration { - g := f() - gotSliceSecondIteration = append(gotSliceSecondIteration, g) - if w != g { - return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration) - } - } - } - - return nil -} - -// testClosure is a test util for TestIsRoundRobin. -type testClosure struct { - r []balancer.SubConn - i int -} - -func (tc *testClosure) next() balancer.SubConn { - ret := tc.r[tc.i] - tc.i = (tc.i + 1) % len(tc.r) - return ret -} - -func (s) TestIsRoundRobin(t *testing.T) { - var ( - sc1 = testSubConns[0] - sc2 = testSubConns[1] - sc3 = testSubConns[2] - ) - - testCases := []struct { - desc string - want []balancer.SubConn - got []balancer.SubConn - pass bool - }{ - { - desc: "0 element", - want: []balancer.SubConn{}, - got: []balancer.SubConn{}, - pass: true, - }, - { - desc: "1 element RR", - want: []balancer.SubConn{sc1}, - got: []balancer.SubConn{sc1, sc1, sc1, sc1}, - pass: true, - }, - { - desc: "1 element not RR", - want: []balancer.SubConn{sc1}, - got: []balancer.SubConn{sc1, sc2, sc1}, - pass: false, - }, - { - desc: "2 elements RR", - want: []balancer.SubConn{sc1, sc2}, - got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2}, - pass: true, - }, - { - desc: "2 elements RR different order from want", - want: []balancer.SubConn{sc2, sc1}, - got: []balancer.SubConn{sc1, sc2, sc1, sc2, sc1, sc2}, - pass: true, - }, - { - desc: "2 elements RR not RR, mistake in first iter", - want: []balancer.SubConn{sc1, sc2}, - got: []balancer.SubConn{sc1, sc1, sc1, sc2, sc1, sc2}, - pass: false, - }, - { - desc: "2 elements RR not RR, mistake in second iter", - want: []balancer.SubConn{sc1, sc2}, - got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc1, sc2}, - pass: false, - }, - { - desc: "2 elements weighted RR", - want: []balancer.SubConn{sc1, sc1, sc2}, - got: []balancer.SubConn{sc1, sc1, sc2, sc1, sc1, sc2}, - pass: true, - }, - { - desc: "2 elements weighted RR different order", - want: []balancer.SubConn{sc1, sc1, sc2}, - got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1}, - pass: true, - }, - - { - desc: "3 elements RR", - want: []balancer.SubConn{sc1, sc2, sc3}, - got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc3, sc1, sc2, sc3}, - pass: true, - }, - { - desc: "3 elements RR different order", - want: []balancer.SubConn{sc1, sc2, sc3}, - got: []balancer.SubConn{sc3, sc2, sc1, sc3, sc2, sc1}, - pass: true, - }, - { - desc: "3 elements weighted RR", - want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, - got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1}, - pass: true, - }, - { - desc: "3 elements weighted RR not RR, mistake in first iter", - want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, - got: []balancer.SubConn{sc1, sc2, sc1, sc1, sc2, sc1, sc1, sc2, sc3, sc1, sc2, sc1}, - pass: false, - }, - { - desc: "3 elements weighted RR not RR, mistake in second iter", - want: []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc3}, - got: []balancer.SubConn{sc1, sc2, sc3, sc1, sc2, sc1, sc1, sc1, sc3, sc1, sc2, sc1}, - pass: false, - }, - } - for _, tC := range testCases { - t.Run(tC.desc, func(t *testing.T) { - err := isRoundRobin(tC.want, (&testClosure{r: tC.got}).next) - if err == nil != tC.pass { - t.Errorf("want pass %v, want %v, got err %v", tC.pass, tC.want, err) - } - }) - } -} diff --git a/xds/internal/testutils/balancer.go b/xds/internal/testutils/balancer.go index 604417c8211..ec3c4b5c6e9 100644 --- a/xds/internal/testutils/balancer.go +++ b/xds/internal/testutils/balancer.go @@ -70,7 +70,7 @@ type TestClientConn struct { NewSubConnCh chan balancer.SubConn // the last 10 subconn created. RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed. - NewPickerCh chan balancer.V2Picker // the last picker updated. + NewPickerCh chan balancer.Picker // the last picker updated. NewStateCh chan connectivity.State // the last state. subConnIdx int @@ -85,7 +85,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn { NewSubConnCh: make(chan balancer.SubConn, 10), RemoveSubConnCh: make(chan balancer.SubConn, 10), - NewPickerCh: make(chan balancer.V2Picker, 1), + NewPickerCh: make(chan balancer.Picker, 1), NewStateCh: make(chan connectivity.State, 1), } } @@ -285,15 +285,20 @@ type testConstBalancer struct { cc balancer.ClientConn } -func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { +func (tb *testConstBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &TestConstPicker{Err: ErrTestConstPicker}}) } -func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) { - if len(a) == 0 { - return +func (tb *testConstBalancer) ResolverError(error) { + panic("not implemented") +} + +func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + if len(s.ResolverState.Addresses) == 0 { + return nil } - tb.cc.NewSubConn(a, balancer.NewSubConnOptions{}) + tb.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil } func (*testConstBalancer) Close() {