From 3330db06b43a8c5cf9def1ce65e61a99a6d586b7 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 17 Nov 2020 14:32:59 -0800 Subject: [PATCH 1/3] [rpc_md_in_address] balancer: set RPC metadata in address attributes, instead of Metadata field This metadata will be sent with all RPCs on the created SubConn --- balancer/grpclb/grpclb_remote_balancer.go | 8 +- balancer/grpclb/grpclb_test.go | 2 +- balancer/grpclb/grpclb_util.go | 2 +- internal/metadata/metadata.go | 58 +++++++++++++++ internal/metadata/metadata_test.go | 86 ++++++++++++++++++++++ internal/transport/http2_client.go | 24 +++--- test/balancer_test.go | 90 +++++++++++++++++++++++ 7 files changed, 253 insertions(+), 17 deletions(-) create mode 100644 internal/metadata/metadata.go create mode 100644 internal/metadata/metadata_test.go diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 8fdda09034f..102eb3831c0 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" + imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -76,10 +77,7 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { // net.SplitHostPort() will return too many colons error. ipStr = fmt.Sprintf("[%s]", ipStr) } - addr := resolver.Address{ - Addr: fmt.Sprintf("%s:%d", ipStr, s.Port), - Metadata: &md, - } + addr := imetadata.Set(resolver.Address{Addr: fmt.Sprintf("%s:%d", ipStr, s.Port)}, md) if logger.V(2) { logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|", i, ipStr, s.Port, s.LoadBalanceToken) @@ -164,7 +162,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback // Create new SubConns. for _, addr := range backendAddrs { addrWithoutMD := addr - addrWithoutMD.Metadata = nil + addrWithoutMD.Attributes = nil addrsSet[addrWithoutMD] = struct{}{} lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD) diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index dc94ca87784..9cbb338c241 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -303,7 +303,7 @@ func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.E if !ok { return nil, status.Error(codes.Internal, "failed to receive metadata") } - if !s.fallback && (md == nil || md["lb-token"][0] != lbToken) { + if !s.fallback && (md == nil || len(md["lb-token"]) == 0 || md["lb-token"][0] != lbToken) { return nil, status.Errorf(codes.Internal, "received unexpected metadata: %v", md) } grpc.SetTrailer(ctx, metadata.Pairs(testmdkey, s.addr)) diff --git a/balancer/grpclb/grpclb_util.go b/balancer/grpclb/grpclb_util.go index 636725e5416..cf5dbdc3ede 100644 --- a/balancer/grpclb/grpclb_util.go +++ b/balancer/grpclb/grpclb_util.go @@ -125,7 +125,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) } addrWithoutMD := addrs[0] - addrWithoutMD.Metadata = nil + addrWithoutMD.Attributes = nil ccc.mu.Lock() defer ccc.mu.Unlock() diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go new file mode 100644 index 00000000000..72430c0ca27 --- /dev/null +++ b/internal/metadata/metadata.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package metadata contains functions to set and get metadata from addresses. +// +// This package is experimental. +package metadata + +import ( + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" +) + +type mdKeyType string + +const mdKey = mdKeyType("grpc.internal.address.metadata") + +// Get returns the metadata of addr. +func Get(addr resolver.Address) metadata.MD { + attrs := addr.Attributes + if attrs == nil { + return nil + } + md, ok := attrs.Value(mdKey).(metadata.MD) + if !ok { + return nil + } + return md +} + +// Set sets (overrides) the metadata in addr. +// +// When a SubConn is created with this address, the RPCs sent on it will all +// have this metadata. +func Set(addr resolver.Address, md metadata.MD) resolver.Address { + if addr.Attributes == nil { + addr.Attributes = attributes.New(mdKey, md) + return addr + } + addr.Attributes = addr.Attributes.WithValues(mdKey, md) + return addr +} diff --git a/internal/metadata/metadata_test.go b/internal/metadata/metadata_test.go new file mode 100644 index 00000000000..68c2ca5808c --- /dev/null +++ b/internal/metadata/metadata_test.go @@ -0,0 +1,86 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package metadata + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/resolver" +) + +func TestGet(t *testing.T) { + tests := []struct { + name string + addr resolver.Address + want metadata.MD + }{ + { + name: "not set", + addr: resolver.Address{}, + want: nil, + }, + { + name: "not set", + addr: resolver.Address{ + Attributes: attributes.New(mdKey, metadata.Pairs("k", "v")), + }, + want: metadata.Pairs("k", "v"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := Get(tt.addr); !cmp.Equal(got, tt.want) { + t.Errorf("Get() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSet(t *testing.T) { + tests := []struct { + name string + addr resolver.Address + md metadata.MD + }{ + { + name: "unset before", + addr: resolver.Address{}, + md: metadata.Pairs("k", "v"), + }, + { + name: "set before", + addr: resolver.Address{ + Attributes: attributes.New(mdKey, metadata.Pairs("bef", "ore")), + }, + md: metadata.Pairs("k", "v"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + newAddr := Set(tt.addr, tt.md) + newMD := Get(newAddr) + if !cmp.Equal(newMD, tt.md) { + t.Errorf("md after Set() = %v, want %v", newMD, tt.md) + } + }) + } +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index fef365c0d28..4778ed16252 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/internal/grpcutil" + imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/codes" @@ -60,7 +61,7 @@ type http2Client struct { cancel context.CancelFunc ctxDone <-chan struct{} // Cache the ctx.Done() chan. userAgent string - md interface{} + md metadata.MD conn net.Conn // underlying communication channel loopy *loopyWriter remoteAddr net.Addr @@ -268,7 +269,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ctxDone: ctx.Done(), // Cache Done chan. cancel: cancel, userAgent: opts.UserAgent, - md: addr.Metadata, conn: conn, remoteAddr: conn.RemoteAddr(), localAddr: conn.LocalAddr(), @@ -296,6 +296,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts keepaliveEnabled: keepaliveEnabled, bufferPool: newBufferPool(), } + + if md, ok := addr.Metadata.(*metadata.MD); ok { + t.md = *md + } else if md := imetadata.Get(addr); md != nil { + t.md = md + } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize @@ -512,14 +518,12 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) } } } - if md, ok := t.md.(*metadata.MD); ok { - for k, vv := range *md { - if isReservedHeader(k) { - continue - } - for _, v := range vv { - headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) - } + for k, vv := range t.md { + if isReservedHeader(k) { + continue + } + for _, v := range vv { + headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) } } return headerFields, nil diff --git a/test/balancer_test.go b/test/balancer_test.go index f0189cf2644..b8fc14751b1 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -39,6 +39,7 @@ import ( "google.golang.org/grpc/internal/balancerload" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" + imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" @@ -543,6 +544,95 @@ func (s) TestAddressAttributesInNewSubConn(t *testing.T) { } } +// TestMetadataInAddressAttributes verifies that the metadata added to +// address.Attributes will be sent with the RPCs. +func (s) TestMetadataInAddressAttributes(t *testing.T) { + const ( + testMDKey = "test-md" + testMDValue = "test-md-value" + mdBalancerName = "metadata-balancer" + ) + + // Register a stub balancer which adds metadata to the first address that it + // receives and then calls NewSubConn on it. + bf := stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + addrs := ccs.ResolverState.Addresses + if len(addrs) == 0 { + return nil + } + + // Only use the first address. + sc, err := bd.ClientConn.NewSubConn([]resolver.Address{ + imetadata.Set(addrs[0], metadata.Pairs(testMDKey, testMDValue)), + }, balancer.NewSubConnOptions{}) + if err != nil { + return err + } + sc.Connect() + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + bd.ClientConn.UpdateState(balancer.State{ConnectivityState: state.ConnectivityState, Picker: &aiPicker{result: balancer.PickResult{SubConn: sc}, err: state.ConnectionError}}) + }, + } + stub.Register(mdBalancerName, bf) + t.Logf("Registered balancer %s...", mdBalancerName) + + r := manual.NewBuilderWithScheme("whatever") + t.Logf("Registered manual resolver with scheme %s...", r.Scheme()) + + testMDChan := make(chan []string, 1) + ss := &stubServer{ + emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok { + select { + case <-testMDChan: + default: + } + testMDChan <- md[testMDKey] + } + return &testpb.Empty{}, nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + dopts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, mdBalancerName)), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatal(err) + } + defer cc.Close() + tc := testpb.NewTestServiceClient(cc) + t.Log("Created a ClientConn...") + + state := resolver.State{Addresses: []resolver.Address{{Addr: ss.address}}} + r.UpdateState(state) + t.Logf("Pushing resolver state update: %v through the manual resolver", state) + + // The RPC should succeed with the expected md. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + t.Log("Made an RPC which succeeded...") + + // The server should receive the test metadata. + md1 := <-testMDChan + if len(md1) == 0 || md1[0] != testMDValue { + t.Fatalf("got md: %v, want %v", md1, []string{testMDValue}) + } +} + // TestServersSwap creates two servers and verifies the client switches between // them when the name resolver reports the first and then the second. func (s) TestServersSwap(t *testing.T) { From 7348d5ec744cc56e5bdc6605fa2502c1b26d1775 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 18 Nov 2020 09:50:01 -0800 Subject: [PATCH 2/3] [rpc_md_in_address] c1 --- balancer/grpclb/grpclb_remote_balancer.go | 12 ++++++------ balancer/grpclb/grpclb_util.go | 10 +++++----- internal/hierarchy/hierarchy.go | 10 +--------- internal/metadata/metadata.go | 10 +--------- test/balancer_test.go | 10 ++++------ 5 files changed, 17 insertions(+), 35 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 102eb3831c0..08d326ab40b 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -161,19 +161,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback addrsSet := make(map[resolver.Address]struct{}) // Create new SubConns. for _, addr := range backendAddrs { - addrWithoutMD := addr - addrWithoutMD.Attributes = nil - addrsSet[addrWithoutMD] = struct{}{} - lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD) + addrWithoutAttrs := addr + addrWithoutAttrs.Attributes = nil + addrsSet[addrWithoutAttrs] = struct{}{} + lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutAttrs) - if _, ok := lb.subConns[addrWithoutMD]; !ok { + if _, ok := lb.subConns[addrWithoutAttrs]; !ok { // Use addrWithMD to create the SubConn. sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts) if err != nil { logger.Warningf("grpclb: failed to create new SubConn: %v", err) continue } - lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map. + lb.subConns[addrWithoutAttrs] = sc // Use the addr without MD as key for the map. if _, ok := lb.scStates[sc]; !ok { // Only set state of new sc to IDLE. The state could already be // READY for cached SubConns. diff --git a/balancer/grpclb/grpclb_util.go b/balancer/grpclb/grpclb_util.go index cf5dbdc3ede..373f04b98d3 100644 --- a/balancer/grpclb/grpclb_util.go +++ b/balancer/grpclb/grpclb_util.go @@ -124,16 +124,16 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer if len(addrs) != 1 { return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) } - addrWithoutMD := addrs[0] - addrWithoutMD.Attributes = nil + addrWithoutAttrs := addrs[0] + addrWithoutAttrs.Attributes = nil ccc.mu.Lock() defer ccc.mu.Unlock() - if entry, ok := ccc.subConnCache[addrWithoutMD]; ok { + if entry, ok := ccc.subConnCache[addrWithoutAttrs]; ok { // If entry is in subConnCache, the SubConn was being deleted. // cancel function will never be nil. entry.cancel() - delete(ccc.subConnCache, addrWithoutMD) + delete(ccc.subConnCache, addrWithoutAttrs) return entry.sc, nil } @@ -142,7 +142,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer return nil, err } - ccc.subConnToAddr[scNew] = addrWithoutMD + ccc.subConnToAddr[scNew] = addrWithoutAttrs return scNew, nil } diff --git a/internal/hierarchy/hierarchy.go b/internal/hierarchy/hierarchy.go index 17185d95d38..a2f990f552e 100644 --- a/internal/hierarchy/hierarchy.go +++ b/internal/hierarchy/hierarchy.go @@ -23,7 +23,6 @@ package hierarchy import ( - "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" ) @@ -37,19 +36,12 @@ func Get(addr resolver.Address) []string { if attrs == nil { return nil } - path, ok := attrs.Value(pathKey).([]string) - if !ok { - return nil - } + path, _ := attrs.Value(pathKey).([]string) return path } // Set overrides the hierarchical path in addr with path. func Set(addr resolver.Address, path []string) resolver.Address { - if addr.Attributes == nil { - addr.Attributes = attributes.New(pathKey, path) - return addr - } addr.Attributes = addr.Attributes.WithValues(pathKey, path) return addr } diff --git a/internal/metadata/metadata.go b/internal/metadata/metadata.go index 72430c0ca27..302262613a0 100644 --- a/internal/metadata/metadata.go +++ b/internal/metadata/metadata.go @@ -22,7 +22,6 @@ package metadata import ( - "google.golang.org/grpc/attributes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" ) @@ -37,10 +36,7 @@ func Get(addr resolver.Address) metadata.MD { if attrs == nil { return nil } - md, ok := attrs.Value(mdKey).(metadata.MD) - if !ok { - return nil - } + md, _ := attrs.Value(mdKey).(metadata.MD) return md } @@ -49,10 +45,6 @@ func Get(addr resolver.Address) metadata.MD { // When a SubConn is created with this address, the RPCs sent on it will all // have this metadata. func Set(addr resolver.Address, md metadata.MD) resolver.Address { - if addr.Attributes == nil { - addr.Attributes = attributes.New(mdKey, md) - return addr - } addr.Attributes = addr.Attributes.WithValues(mdKey, md) return addr } diff --git a/test/balancer_test.go b/test/balancer_test.go index b8fc14751b1..4ab69ea3595 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -579,19 +579,16 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { stub.Register(mdBalancerName, bf) t.Logf("Registered balancer %s...", mdBalancerName) - r := manual.NewBuilderWithScheme("whatever") - t.Logf("Registered manual resolver with scheme %s...", r.Scheme()) - testMDChan := make(chan []string, 1) ss := &stubServer{ emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { select { - case <-testMDChan: - default: + case testMDChan <- md[testMDKey]: + case <-ctx.Done(): + return nil, ctx.Err() } - testMDChan <- md[testMDKey] } return &testpb.Empty{}, nil }, @@ -601,6 +598,7 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { } defer ss.Stop() + r := manual.NewBuilderWithScheme("whatever") dopts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithResolvers(r), From c1d674ce9645ea17c53dd6b9e11d7840d1f86a53 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 18 Nov 2020 14:58:36 -0800 Subject: [PATCH 3/3] [rpc_md_in_address] dumb --- test/balancer_test.go | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/test/balancer_test.go b/test/balancer_test.go index 4ab69ea3595..7af5c81a001 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -561,7 +561,6 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { if len(addrs) == 0 { return nil } - // Only use the first address. sc, err := bd.ClientConn.NewSubConn([]resolver.Address{ imetadata.Set(addrs[0], metadata.Pairs(testMDKey, testMDValue)), @@ -593,33 +592,17 @@ func (s) TestMetadataInAddressAttributes(t *testing.T) { return &testpb.Empty{}, nil }, } - if err := ss.Start(nil); err != nil { + if err := ss.Start(nil, grpc.WithDefaultServiceConfig( + fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, mdBalancerName), + )); err != nil { t.Fatalf("Error starting endpoint server: %v", err) } defer ss.Stop() - r := manual.NewBuilderWithScheme("whatever") - dopts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, mdBalancerName)), - } - cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) - if err != nil { - t.Fatal(err) - } - defer cc.Close() - tc := testpb.NewTestServiceClient(cc) - t.Log("Created a ClientConn...") - - state := resolver.State{Addresses: []resolver.Address{{Addr: ss.address}}} - r.UpdateState(state) - t.Logf("Pushing resolver state update: %v through the manual resolver", state) - // The RPC should succeed with the expected md. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil { t.Fatalf("EmptyCall() = _, %v, want _, ", err) } t.Log("Made an RPC which succeeded...")