diff --git a/balancer/balancer.go b/balancer/balancer.go index 8f00523c0e2..30ba1811ad9 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -105,8 +105,8 @@ type SubConn interface { // // This will trigger a state transition for the SubConn. // - // Deprecated: This method is now part of the ClientConn interface and will - // eventually be removed from here. + // Deprecated: this method will be removed. Create new SubConns for new + // addresses instead. UpdateAddresses([]resolver.Address) // Connect starts the connecting for this SubConn. Connect() @@ -150,6 +150,9 @@ type ClientConn interface { // NewSubConn is called by balancer to create a new SubConn. // It doesn't block and wait for the connections to be established. // Behaviors of the SubConn can be controlled by options. + // + // Deprecated: please be aware that in a future version, SubConns will only + // support one address per SubConn. NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error) // RemoveSubConn removes the SubConn from ClientConn. // The SubConn will be shutdown. @@ -159,7 +162,10 @@ type ClientConn interface { // If so, the connection will be kept. Else, the connection will be // gracefully closed, and a new connection will be created. // - // This will trigger a state transition for the SubConn. + // This may trigger a state transition for the SubConn. + // + // Deprecated: this method will be removed. Create new SubConns for new + // addresses instead. UpdateAddresses(SubConn, []resolver.Address) // UpdateState notifies gRPC that the balancer's internal state has diff --git a/internal/internal.go b/internal/internal.go index 42ff39c8444..3259fb079f7 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -164,6 +164,10 @@ var ( // ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY. ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions) + + // GRPCResolverSchemeExtraMetadata determines when gRPC will add extra + // metadata to RPCs. + GRPCResolverSchemeExtraMetadata string = "xds" ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/resolver/resolver.go b/resolver/resolver.go index 459dfec195e..f3310bc6c54 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -104,6 +104,9 @@ type Address struct { // BalancerAttributes contains arbitrary data about this address intended // for consumption by the LB policy. These attributes do not affect SubConn // creation, connection establishment, handshaking, etc. + // + // Deprecated: when an Address is inside an Endpoint, this field should not + // be used, and it will eventually be removed entirely. BalancerAttributes *attributes.Attributes // Metadata is the information associated with Addr, which may be used @@ -167,11 +170,37 @@ type BuildOptions struct { Dialer func(context.Context, string) (net.Conn, error) } +// An Endpoint is one network endpoint, or server, which may have multiple +// addresses with which it can be accessed. +type Endpoint struct { + // Addresses contains a list of addresses used to access this endpoint. + Addresses []Address + + // Attributes contains arbitrary data about this endpoint intended for + // consumption by the LB policy. + Attributes *attributes.Attributes +} + // State contains the current Resolver state relevant to the ClientConn. type State struct { // Addresses is the latest set of resolved addresses for the target. + // + // If a resolver sets Addresses but does not set Endpoints, one Endpoint + // will be created for each Address before the State is passed to the LB + // policy. The BalancerAttributes of each entry in Addresses will be set + // in Endpoints.Attributes, and be cleared in the Endpoint's Address's + // BalancerAttributes. + // + // Soon, Addresses will be deprecated and replaced fully by Endpoints. Addresses []Address + // Endpoints is the latest set of resolved endpoints for the target. + // + // If a resolver produces a State containing Endpoints but not Addresses, + // it must take care to ensure the LB policies it selects will support + // Endpoints. + Endpoints []Endpoint + // ServiceConfig contains the result from parsing the latest service // config. If it is nil, it indicates no service config is present or the // resolver does not provide service configs. @@ -294,10 +323,3 @@ type Resolver interface { // Close closes the resolver. Close() } - -// UnregisterForTesting removes the resolver builder with the given scheme from the -// resolver map. -// This function is for testing only. -func UnregisterForTesting(scheme string) { - delete(m, scheme) -} diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index b408b3688f2..6d656d06d05 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -152,6 +152,14 @@ func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { errCh := make(chan error, 1) + if s.Endpoints == nil { + s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) + for _, a := range s.Addresses { + ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} + ep.Addresses[0].BalancerAttributes = nil + s.Endpoints = append(s.Endpoints, ep) + } + } ok := ccr.serializer.Schedule(func(context.Context) { ccr.addChannelzTraceEvent(s) ccr.curState = s diff --git a/resolver_test.go b/resolver_test.go index 5b1e40c2a3d..bde92f7b0bf 100644 --- a/resolver_test.go +++ b/resolver_test.go @@ -24,8 +24,13 @@ import ( "net" "testing" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" ) type wrapResolverBuilder struct { @@ -91,3 +96,49 @@ func (s) TestResolverCaseSensitivity(t *testing.T) { } cc.Close() } + +// TestResolverAddressesToEndpoints ensures one Endpoint is created for each +// entry in resolver.State.Addresses automatically. +func (s) TestResolverAddressesToEndpoints(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + const scheme = "testresolveraddressestoendpoints" + r := manual.NewBuilderWithScheme(scheme) + + stateCh := make(chan balancer.ClientConnState, 1) + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error { + stateCh <- ccs + return nil + }, + } + balancerName := "stub-balancer-" + scheme + stub.Register(balancerName, bf) + + a1 := attributes.New("x", "y") + a2 := attributes.New("a", "b") + r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "addr1", BalancerAttributes: a1}, {Addr: "addr2", BalancerAttributes: a2}}}) + + cc, err := Dial(r.Scheme()+":///", + WithTransportCredentials(insecure.NewCredentials()), + WithResolvers(r), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancerName))) + if err != nil { + t.Fatalf("Unexpected error dialing: %v", err) + } + defer cc.Close() + + select { + case got := <-stateCh: + want := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: "addr1"}}, Attributes: a1}, + {Addresses: []resolver.Address{{Addr: "addr2"}}, Attributes: a2}, + } + if diff := cmp.Diff(got.ResolverState.Endpoints, want); diff != "" { + t.Errorf("Did not receive expected endpoints. Diff (-got +want):\n%v", diff) + } + case <-ctx.Done(): + t.Fatalf("timed out waiting for endpoints") + } +} diff --git a/stream.go b/stream.go index de32a759714..e3bb3f20f38 100644 --- a/stream.go +++ b/stream.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" @@ -433,7 +434,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) ctx = trace.NewContext(ctx, trInfo.tr) } - if cs.cc.parsedTarget.URL.Scheme == "xds" { + if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata { // Add extra metadata (metadata that will be added by transport) to context // so the balancer can see them. ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs( diff --git a/test/balancer_test.go b/test/balancer_test.go index 6bbaed28097..7109b20d2e1 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/internal/grpcsync" @@ -196,14 +197,10 @@ func testPickExtraMetadata(t *testing.T, e env) { te.startServer(&testServer{security: e.security}) defer te.tearDown() - // Set resolver to xds to trigger the extra metadata code path. - r := manual.NewBuilderWithScheme("xds") - resolver.Register(r) - defer func() { - resolver.UnregisterForTesting("xds") - }() - r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}}) - te.resolverScheme = "xds" + // Trigger the extra-metadata-adding code path. + defer func(old string) { internal.GRPCResolverSchemeExtraMetadata = old }(internal.GRPCResolverSchemeExtraMetadata) + internal.GRPCResolverSchemeExtraMetadata = "passthrough" + cc := te.clientConn() tc := testgrpc.NewTestServiceClient(cc) diff --git a/xds/googledirectpath/googlec2p_test.go b/xds/googledirectpath/googlec2p_test.go index 44e1a68e238..945202fe71a 100644 --- a/xds/googledirectpath/googlec2p_test.go +++ b/xds/googledirectpath/googlec2p_test.go @@ -60,31 +60,13 @@ var ( ) func replaceResolvers() func() { - var registerForTesting bool - if resolver.Get(c2pScheme) == nil { - // If env var to enable c2p is not set, the resolver isn't registered. - // Need to register and unregister in defer. - registerForTesting = true - resolver.Register(&c2pResolverBuilder{}) - } oldDNS := resolver.Get("dns") resolver.Register(testDNSResolver) oldXDS := resolver.Get("xds") resolver.Register(testXDSResolver) return func() { - if oldDNS != nil { - resolver.Register(oldDNS) - } else { - resolver.UnregisterForTesting("dns") - } - if oldXDS != nil { - resolver.Register(oldXDS) - } else { - resolver.UnregisterForTesting("xds") - } - if registerForTesting { - resolver.UnregisterForTesting(c2pScheme) - } + resolver.Register(oldDNS) + resolver.Register(oldXDS) } } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 6faf81ab552..a5749aa9aff 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -253,8 +253,15 @@ func (b *clusterResolverBalancer) updateChildConfig() { } b.logger.Infof("Built child policy config: %v", pretty.ToJSON(childCfg)) + endpoints := make([]resolver.Endpoint, len(addrs)) + for i, a := range addrs { + endpoints[i].Attributes = a.BalancerAttributes + endpoints[i].Addresses = []resolver.Address{a} + endpoints[i].Addresses[0].BalancerAttributes = nil + } if err := b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ + Endpoints: endpoints, Addresses: addrs, ServiceConfig: b.configRaw, Attributes: b.attrsWithClient, diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index b4740eea6d0..f99511f6f82 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -121,7 +121,6 @@ func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOp mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts } dnsResolverBuilder := resolver.Get("dns") - resolver.UnregisterForTesting("dns") resolver.Register(mr) return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index 0da74f628db..3b350f792ed 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -125,9 +125,21 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error { } dr.mu.Lock() - addrs := make([]string, len(state.Addresses)) - for i, a := range state.Addresses { - addrs[i] = a.Addr + var addrs []string + if len(state.Endpoints) > 0 { + // Assume 1 address per endpoint, which is how DNS is expected to + // behave. The slice will grow as needed, however. + addrs = make([]string, 0, len(state.Endpoints)) + for _, e := range state.Endpoints { + for _, a := range e.Addresses { + addrs = append(addrs, a.Addr) + } + } + } else { + addrs = make([]string, len(state.Addresses)) + for i, a := range state.Addresses { + addrs[i] = a.Addr + } } dr.addrs = addrs dr.updateReceived = true