From 822735151841bcaffe9cf6474eda7b13e808b30b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 17 Nov 2020 14:32:38 -0800 Subject: [PATCH] [roundrobin_strips_attributes] roundrobin test --- balancer/base/balancer.go | 2 - balancer/roundrobin/roundrobin_test.go | 82 +++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index bdcc81af36d0..e0d34288ccfc 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -135,8 +135,6 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // The SubConn does a reflect.DeepEqual of the new and old // addresses. So this is a noop if the current address is the same // as the old one (including attributes). - // - // TODO: delete this when this balancer reads attributes. sc.UpdateAddresses([]resolver.Address{a}) } } diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 5a8ba481c9f0..e0d0100d2657 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -32,6 +32,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" + imetadata "google.golang.org/grpc/internal/metadata" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -39,6 +41,10 @@ import ( testpb "google.golang.org/grpc/test/grpc_testing" ) +const ( + testMDKey = "test-md" +) + type s struct { grpctest.Tester } @@ -49,9 +55,23 @@ func Test(t *testing.T) { type testServer struct { testpb.UnimplementedTestServiceServer + + testMDChan chan []string +} + +func newTestServer() *testServer { + return &testServer{testMDChan: make(chan []string, 1)} } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok && len(md[testMDKey]) != 0 { + select { + case s.testMDChan <- md[testMDKey]: + case <-ctx.Done(): + return nil, ctx.Err() + } + } return &testpb.Empty{}, nil } @@ -60,8 +80,9 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ } type test struct { - servers []*grpc.Server - addresses []string + servers []*grpc.Server + serverImpls []*testServer + addresses []string } func (t *test) cleanup() { @@ -85,8 +106,10 @@ func startTestServers(count int) (_ *test, err error) { } s := grpc.NewServer() - testpb.RegisterTestServiceServer(s, &testServer{}) + sImpl := newTestServer() + testpb.RegisterTestServiceServer(s, sImpl) t.servers = append(t.servers, s) + t.serverImpls = append(t.serverImpls, sImpl) t.addresses = append(t.addresses, lis.Addr().String()) go func(s *grpc.Server, l net.Listener) { @@ -473,3 +496,56 @@ func (s) TestAllServersDown(t *testing.T) { } t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") } + +func (s) TestUpdateAddressAttributes(t *testing.T) { + r := manual.NewBuilderWithScheme("whatever") + + test, err := startTestServers(1) + if err != nil { + t.Fatalf("failed to start servers: %v", err) + } + defer test.cleanup() + + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(roundrobin.Name)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + testc := testpb.NewTestServiceClient(cc) + // The first RPC should fail because there's no address. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) + } + + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) + // The second RPC should succeed. + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + // The first RPC should not set metadata, so there's no md in the channel. + select { + case md1 := <-test.serverImpls[0].testMDChan: + if len(md1) != 0 { + t.Fatalf("got md: %v, want empty metadata", md1) + } + case <-time.After(time.Microsecond * 100): + } + + const testMDValue = "test-md-value" + // Update metadata in address. + r.UpdateState(resolver.State{Addresses: []resolver.Address{ + imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)), + }}) + // The second RPC should succeed. + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + + // The second RPC should send metadata with it. + md2 := <-test.serverImpls[0].testMDChan + if len(md2) == 0 || md2[0] != testMDValue { + t.Fatalf("got md: %v, want %v", md2, []string{testMDValue}) + } +}